From 0c7b16e7fdeef43ab80f13db40c134e1388785a4 Mon Sep 17 00:00:00 2001 From: ydrozhdzhal Date: Fri, 5 Nov 2021 12:05:25 +0200 Subject: [PATCH] Added metrics cardinality handling for Google Cloud Spanner receiver - part 2. This part contains item filter factory which is responsible for calculation of cardinality limits for metrics and construction of item filters for each metric. --- receiver/googlecloudspannerreceiver/go.mod | 1 + receiver/googlecloudspannerreceiver/go.sum | 1 + .../internal/filterfactory/filterbuilder.go | 150 ++++++++++ .../filterfactory/filterbuilder_test.go | 275 ++++++++++++++++++ .../filterfactory/itemfilterfactory.go | 102 +++++++ .../filterfactory/itemfilterfactory_test.go | 152 ++++++++++ .../filterfactory/testhelpers_test.go | 68 +++++ 7 files changed, 749 insertions(+) create mode 100644 receiver/googlecloudspannerreceiver/internal/filterfactory/filterbuilder.go create mode 100644 receiver/googlecloudspannerreceiver/internal/filterfactory/filterbuilder_test.go create mode 100644 receiver/googlecloudspannerreceiver/internal/filterfactory/itemfilterfactory.go create mode 100644 receiver/googlecloudspannerreceiver/internal/filterfactory/itemfilterfactory_test.go create mode 100644 receiver/googlecloudspannerreceiver/internal/filterfactory/testhelpers_test.go diff --git a/receiver/googlecloudspannerreceiver/go.mod b/receiver/googlecloudspannerreceiver/go.mod index 14b176804419..e45ae3ed18d7 100644 --- a/receiver/googlecloudspannerreceiver/go.mod +++ b/receiver/googlecloudspannerreceiver/go.mod @@ -48,6 +48,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rogpeppe/go-internal v1.6.1 // indirect github.com/spf13/cast v1.4.1 // indirect + github.com/stretchr/objx v0.1.0 // indirect go.opencensus.io v0.23.0 // indirect go.opentelemetry.io/otel v1.1.0 // indirect go.opentelemetry.io/otel/metric v0.24.0 // indirect diff --git a/receiver/googlecloudspannerreceiver/go.sum b/receiver/googlecloudspannerreceiver/go.sum index 89a9a8392cd3..7655294b1f70 100644 --- a/receiver/googlecloudspannerreceiver/go.sum +++ b/receiver/googlecloudspannerreceiver/go.sum @@ -390,6 +390,7 @@ github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0 github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH9Ns= +github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= diff --git a/receiver/googlecloudspannerreceiver/internal/filterfactory/filterbuilder.go b/receiver/googlecloudspannerreceiver/internal/filterfactory/filterbuilder.go new file mode 100644 index 000000000000..32f21836c9b5 --- /dev/null +++ b/receiver/googlecloudspannerreceiver/internal/filterfactory/filterbuilder.go @@ -0,0 +1,150 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package filterfactory + +import ( + "errors" + + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/filter" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/metadata" +) + +type filterBuilder struct { + logger *zap.Logger + config *ItemFilterFactoryConfig +} + +func (b filterBuilder) buildFilterByMetricZeroTotalLimit() map[string]filter.ItemFilter { + filterByMetric := make(map[string]filter.ItemFilter) + nopFilter := filter.NewNopItemCardinalityFilter() + + for _, metadataItem := range b.config.MetadataItems { + for _, metricValueMetadata := range metadataItem.QueryMetricValuesMetadata { + metricFullName := metadataItem.MetricNamePrefix + metricValueMetadata.Name() + filterByMetric[metricFullName] = nopFilter + } + } + + return filterByMetric +} + +func (b filterBuilder) buildFilterByMetricPositiveTotalLimit() (map[string]filter.ItemFilter, error) { + filterByMetric := make(map[string]filter.ItemFilter) + groupedItems := groupByCardinality(b.config.MetadataItems) + + // Handle metric groups with low cardinality + lowCardinalityGroups := groupedItems[false] + newTotalLimit, err := b.handleLowCardinalityGroups(lowCardinalityGroups, b.config.TotalLimit, filterByMetric) + if err != nil { + return nil, err + } + + // Handle metric groups with high cardinality + highCardinalityGroups := groupedItems[true] + newTotalLimit, err = b.handleHighCardinalityGroups(highCardinalityGroups, newTotalLimit, filterByMetric) + if err != nil { + return nil, err + } + + b.logger.Debug("Remaining total limit after cardinality limits calculation", + zap.Int("remainingTotalLimit", newTotalLimit)) + + return filterByMetric, nil +} + +func (b filterBuilder) handleLowCardinalityGroups(groups []*metadata.MetricsMetadata, remainingTotalLimit int, + filterByMetric map[string]filter.ItemFilter) (int, error) { + + if len(groups) == 0 { + return remainingTotalLimit, nil + } + + limitPerMetricByTimestamp := b.config.ProjectAmount * b.config.InstanceAmount * b.config.DatabaseAmount + + // For low cardinality metrics total limit is equal to limit by timestamp + b.logger.Debug("Calculated cardinality limits for low cardinality metric group", + zap.Int("limitPerMetricByTimestamp", limitPerMetricByTimestamp)) + + return b.constructFiltersForGroups(limitPerMetricByTimestamp, limitPerMetricByTimestamp, groups, remainingTotalLimit, filterByMetric) +} + +func (b filterBuilder) handleHighCardinalityGroups(groups []*metadata.MetricsMetadata, remainingTotalLimit int, + filterByMetric map[string]filter.ItemFilter) (int, error) { + + if len(groups) == 0 { + return remainingTotalLimit, nil + } + + totalLimitPerMetric := remainingTotalLimit / countMetricsInGroups(groups) + limitPerMetricByTimestamp := totalLimitPerMetric / defaultMetricDataPointsAmountInPeriod + + b.logger.Debug("Calculated cardinality limits for high cardinality metric group", + zap.Int("limitPerMetricByTimestamp", limitPerMetricByTimestamp), + zap.Int("totalLimitPerMetric", totalLimitPerMetric)) + + if limitPerMetricByTimestamp < 1 { + return remainingTotalLimit, errors.New("limit per metric per timestamp for high cardinality metrics is lower than 1") + } + + return b.constructFiltersForGroups(totalLimitPerMetric, limitPerMetricByTimestamp, groups, remainingTotalLimit, filterByMetric) +} + +func (b filterBuilder) constructFiltersForGroups(totalLimitPerMetric int, limitPerMetricByTimestamp int, + groups []*metadata.MetricsMetadata, remainingTotalLimit int, filterByMetric map[string]filter.ItemFilter) (int, error) { + + newTotalLimit := remainingTotalLimit + + for _, metadataItem := range groups { + for _, metricValueMetadata := range metadataItem.QueryMetricValuesMetadata { + newTotalLimit -= totalLimitPerMetric + metricFullName := metadataItem.MetricNamePrefix + metricValueMetadata.Name() + + b.logger.Debug("Setting cardinality limits for metric", + zap.String("metricFullName", metricFullName), + zap.Int("limitPerMetricByTimestamp", limitPerMetricByTimestamp), + zap.Int("totalLimitPerMetric", totalLimitPerMetric), + zap.Int("remainingTotalLimit", newTotalLimit)) + + itemFilter, err := filter.NewItemCardinalityFilter(metricFullName, totalLimitPerMetric, + limitPerMetricByTimestamp, defaultItemActivityPeriod, b.logger) + if err != nil { + return remainingTotalLimit, err + } + filterByMetric[metricFullName] = itemFilter + } + } + + return newTotalLimit, nil +} + +func countMetricsInGroups(metadataItems []*metadata.MetricsMetadata) (amount int) { + for _, metadataItem := range metadataItems { + amount += len(metadataItem.QueryMetricValuesMetadata) + } + + return amount +} + +func groupByCardinality(metadataItems []*metadata.MetricsMetadata) map[bool][]*metadata.MetricsMetadata { + groupedItems := make(map[bool][]*metadata.MetricsMetadata) + + for _, metadataItem := range metadataItems { + groupedItems[metadataItem.HighCardinality] = append(groupedItems[metadataItem.HighCardinality], metadataItem) + } + + return groupedItems +} diff --git a/receiver/googlecloudspannerreceiver/internal/filterfactory/filterbuilder_test.go b/receiver/googlecloudspannerreceiver/internal/filterfactory/filterbuilder_test.go new file mode 100644 index 000000000000..a3e247f21827 --- /dev/null +++ b/receiver/googlecloudspannerreceiver/internal/filterfactory/filterbuilder_test.go @@ -0,0 +1,275 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package filterfactory + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/filter" +) + +func TestFilterBuilder_BuildFilterByMetricZeroTotalLimit(t *testing.T) { + logger := zaptest.NewLogger(t) + metricPrefixes := []string{prefix1, prefix2} + prefixHighCardinality := []bool{true, true} + metadataItems := generateMetadataItems(metricPrefixes, prefixHighCardinality) + config := &ItemFilterFactoryConfig{ + MetadataItems: metadataItems, + } + nopItemFilter := filter.NewNopItemCardinalityFilter() + builder := filterBuilder{ + logger: logger, + config: config, + } + + result := builder.buildFilterByMetricZeroTotalLimit() + + // Because we have 2 groups and each group has 2 metrics + assert.Equal(t, len(metricPrefixes)*2, len(result)) + for _, metadataItem := range metadataItems { + for _, metricValueMetadata := range metadataItem.QueryMetricValuesMetadata { + f, exists := result[metadataItem.MetricNamePrefix+metricValueMetadata.Name()] + assert.True(t, exists) + assert.Equal(t, nopItemFilter, f) + } + } +} + +func TestFilterBuilder_BuildFilterByMetricPositiveTotalLimit(t *testing.T) { + logger := zaptest.NewLogger(t) + testCases := map[string]struct { + metricPrefixes []string + prefixHighCardinality []bool + totalLimit int + projectAmount int + instanceAmount int + databaseAmount int + expectedHighCardinalityTotalLimit int + expectedHighCardinalityLimitByTimestamp int + expectError bool + }{ + "Happy path with 2 high cardinality groups": {[]string{prefix1, prefix2}, []bool{true, true}, 200 * defaultMetricDataPointsAmountInPeriod, 1, 2, 5, 72000, 50, false}, + "Happy path with 2 low cardinality groups": {[]string{prefix1, prefix2}, []bool{false, false}, 200, 1, 2, 5, 0, 0, false}, + "Happy path with 1 low and 1 high cardinality groups": {[]string{prefix1, prefix2}, []bool{false, true}, 200*defaultMetricDataPointsAmountInPeriod + 20, 1, 2, 5, 144000, 100, false}, + "Error when limit by timestamp is lower than 1 for high cardinality groups": {[]string{prefix1, prefix2}, []bool{true, true}, 200, 1, 2, 5, 0, 0, true}, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + metadataItems := generateMetadataItems(testCase.metricPrefixes, testCase.prefixHighCardinality) + config := &ItemFilterFactoryConfig{ + MetadataItems: metadataItems, + TotalLimit: testCase.totalLimit, + ProjectAmount: testCase.projectAmount, + InstanceAmount: testCase.instanceAmount, + DatabaseAmount: testCase.databaseAmount, + } + builder := filterBuilder{ + logger: logger, + config: config, + } + + result, err := builder.buildFilterByMetricPositiveTotalLimit() + if testCase.expectError { + require.Error(t, err) + return + } + require.NoError(t, err) + + // Because we have 2 groups and each group has 2 metrics + assert.Equal(t, len(testCase.metricPrefixes)*2, len(result)) + for _, metadataItem := range metadataItems { + for _, metricValueMetadata := range metadataItem.QueryMetricValuesMetadata { + f, exists := result[metadataItem.MetricNamePrefix+metricValueMetadata.Name()] + assert.True(t, exists) + if metadataItem.HighCardinality { + assert.Equal(t, testCase.expectedHighCardinalityTotalLimit, f.TotalLimit()) + assert.Equal(t, testCase.expectedHighCardinalityLimitByTimestamp, f.LimitByTimestamp()) + } else { + // For low cardinality group both limits are equal to projectAmount * instanceAmount * databaseAmount + expectedLimit := testCase.projectAmount * testCase.instanceAmount * testCase.databaseAmount + assert.Equal(t, expectedLimit, f.TotalLimit()) + assert.Equal(t, expectedLimit, f.LimitByTimestamp()) + } + } + } + }) + } +} + +func TestFilterBuilder_HandleLowCardinalityGroups(t *testing.T) { + logger := zaptest.NewLogger(t) + testCases := map[string]struct { + metricPrefixes []string + prefixHighCardinality []bool + totalLimit int + projectAmount int + instanceAmount int + databaseAmount int + expectedRemainingTotalLimit int + }{ + "With 2 low cardinality groups": {[]string{prefix1, prefix2}, []bool{false, false}, 50, 1, 2, 5, 10}, + "With 0 low cardinality groups": {[]string{}, []bool{}, 50, 1, 2, 5, 50}, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + metadataItems := generateMetadataItems(testCase.metricPrefixes, testCase.prefixHighCardinality) + config := &ItemFilterFactoryConfig{ + MetadataItems: metadataItems, + TotalLimit: testCase.totalLimit, + ProjectAmount: testCase.projectAmount, + InstanceAmount: testCase.instanceAmount, + DatabaseAmount: testCase.databaseAmount, + } + builder := filterBuilder{ + logger: logger, + config: config, + } + + filterByMetric := make(map[string]filter.ItemFilter) + remainingTotalLimit, err := builder.handleLowCardinalityGroups(metadataItems, testCase.totalLimit, filterByMetric) + require.NoError(t, err) + + // Because we have 2 groups and each group has 2 metrics + assert.Equal(t, len(testCase.metricPrefixes)*2, len(filterByMetric)) + for _, metadataItem := range metadataItems { + for _, metricValueMetadata := range metadataItem.QueryMetricValuesMetadata { + f, exists := filterByMetric[metadataItem.MetricNamePrefix+metricValueMetadata.Name()] + assert.True(t, exists) + // For low cardinality group both limits are equal to projectAmount * instanceAmount * databaseAmount + expectedLimit := testCase.projectAmount * testCase.instanceAmount * testCase.databaseAmount + assert.Equal(t, expectedLimit, f.TotalLimit()) + assert.Equal(t, expectedLimit, f.LimitByTimestamp()) + assert.Equal(t, testCase.expectedRemainingTotalLimit, remainingTotalLimit) + } + } + }) + } +} + +func TestFilterBuilder_HandleHighCardinalityGroups(t *testing.T) { + logger := zaptest.NewLogger(t) + testCases := map[string]struct { + metricPrefixes []string + prefixHighCardinality []bool + totalLimit int + expectedHighCardinalityTotalLimit int + expectedHighCardinalityLimitByTimestamp int + expectedRemainingTotalLimit int + expectError bool + }{ + "With 2 high cardinality groups": {[]string{prefix1, prefix2}, []bool{true, true}, 200 * defaultMetricDataPointsAmountInPeriod, 72000, 50, 0, false}, + "With zero high cardinality groups": {[]string{}, []bool{}, 200, 0, 0, 200, false}, + "Error when limit by timestamp is lower than 1 for high cardinality groups": {[]string{prefix1, prefix2}, []bool{true, true}, 200, 0, 0, 200, true}, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + metadataItems := generateMetadataItems(testCase.metricPrefixes, testCase.prefixHighCardinality) + config := &ItemFilterFactoryConfig{ + MetadataItems: metadataItems, + TotalLimit: testCase.totalLimit, + ProjectAmount: 1, + InstanceAmount: 2, + DatabaseAmount: 5, + } + builder := filterBuilder{ + logger: logger, + config: config, + } + filterByMetric := make(map[string]filter.ItemFilter) + remainingTotalLimit, err := builder.handleHighCardinalityGroups(metadataItems, testCase.totalLimit, filterByMetric) + if testCase.expectError { + require.Error(t, err) + return + } + require.NoError(t, err) + + // Because we have 2 groups and each group has 2 metrics + assert.Equal(t, len(testCase.metricPrefixes)*2, len(filterByMetric)) + for _, metadataItem := range metadataItems { + for _, metricValueMetadata := range metadataItem.QueryMetricValuesMetadata { + f, exists := filterByMetric[metadataItem.MetricNamePrefix+metricValueMetadata.Name()] + assert.True(t, exists) + assert.Equal(t, testCase.expectedHighCardinalityTotalLimit, f.TotalLimit()) + assert.Equal(t, testCase.expectedHighCardinalityLimitByTimestamp, f.LimitByTimestamp()) + assert.Equal(t, testCase.expectedRemainingTotalLimit, remainingTotalLimit) + } + } + }) + } +} + +func TestFilterBuilder_TestConstructFiltersForGroups(t *testing.T) { + logger := zaptest.NewLogger(t) + metricPrefixes := []string{prefix1, prefix2} + prefixHighCardinality := []bool{true, true} + metadataItems := generateMetadataItems(metricPrefixes, prefixHighCardinality) + config := &ItemFilterFactoryConfig{ + MetadataItems: metadataItems, + } + builder := filterBuilder{ + logger: logger, + config: config, + } + filterByMetric := make(map[string]filter.ItemFilter) + const totalLimitPerMetric, limitPerMetricByTimestamp, remainingTotalLimit, expectedRemainingTotalLimit = 50, 10, 200, 0 + + result, err := builder.constructFiltersForGroups(totalLimitPerMetric, limitPerMetricByTimestamp, metadataItems, + remainingTotalLimit, filterByMetric) + require.NoError(t, err) + + // Because we have 2 groups and each group has 2 metrics + assert.Equal(t, len(metricPrefixes)*2, len(filterByMetric)) + for _, metadataItem := range metadataItems { + for _, metricValueMetadata := range metadataItem.QueryMetricValuesMetadata { + f, exists := filterByMetric[metadataItem.MetricNamePrefix+metricValueMetadata.Name()] + assert.True(t, exists) + assert.Equal(t, totalLimitPerMetric, f.TotalLimit()) + assert.Equal(t, limitPerMetricByTimestamp, f.LimitByTimestamp()) + assert.Equal(t, expectedRemainingTotalLimit, result) + } + } +} + +func TestCountMetricsInGroups(t *testing.T) { + metricPrefixes := []string{prefix1, prefix2} + prefixHighCardinality := []bool{true, true} + metadataItems := generateMetadataItems(metricPrefixes, prefixHighCardinality) + + assert.Equal(t, 4, countMetricsInGroups(metadataItems)) +} + +func TestGroupByCardinality(t *testing.T) { + metricPrefixes := []string{"prefix1-", "prefix2-"} + prefixHighCardinality := []bool{false, true} + metadataItems := generateMetadataItems(metricPrefixes, prefixHighCardinality) + + result := groupByCardinality(metadataItems) + + assert.Equal(t, 2, len(result)) + + for _, metadataItem := range metadataItems { + groups, exists := result[metadataItem.HighCardinality] + assert.True(t, exists) + assert.Equal(t, 1, len(groups)) + assert.Equal(t, metadataItem, groups[0]) + } +} diff --git a/receiver/googlecloudspannerreceiver/internal/filterfactory/itemfilterfactory.go b/receiver/googlecloudspannerreceiver/internal/filterfactory/itemfilterfactory.go new file mode 100644 index 000000000000..bbb480b293ce --- /dev/null +++ b/receiver/googlecloudspannerreceiver/internal/filterfactory/itemfilterfactory.go @@ -0,0 +1,102 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package filterfactory + +import ( + "errors" + "fmt" + "time" + + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/filter" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/metadata" +) + +const ( + defaultMetricDataPointsAmountInPeriod = 24 * 60 + defaultItemActivityPeriod = 24 * time.Hour +) + +type itemFilterFactory struct { + filterByMetric map[string]filter.ItemFilter +} + +type ItemFilterFactoryConfig struct { + MetadataItems []*metadata.MetricsMetadata + TotalLimit int + ProjectAmount int + InstanceAmount int + DatabaseAmount int +} + +func NewItemFilterResolver(logger *zap.Logger, config *ItemFilterFactoryConfig) (filter.ItemFilterResolver, error) { + if err := config.validate(); err != nil { + return nil, err + } + + builder := filterBuilder{ + logger: logger, + config: config, + } + + if config.TotalLimit == 0 { + return &itemFilterFactory{ + filterByMetric: builder.buildFilterByMetricZeroTotalLimit(), + }, nil + } + + filterByMetric, err := builder.buildFilterByMetricPositiveTotalLimit() + if err != nil { + return nil, err + } + + return &itemFilterFactory{ + filterByMetric: filterByMetric, + }, nil +} + +func (config *ItemFilterFactoryConfig) validate() error { + if len(config.MetadataItems) == 0 { + return errors.New("metadata items cannot be empty or nil") + } + + if config.TotalLimit != 0 && config.TotalLimit <= (config.ProjectAmount*config.InstanceAmount*config.DatabaseAmount) { + return errors.New("total limit is too low and doesn't cover configured projects * instances * databases") + } + + return nil +} + +func (f *itemFilterFactory) Resolve(metricFullName string) (filter.ItemFilter, error) { + itemFilter, exists := f.filterByMetric[metricFullName] + + if !exists { + return nil, fmt.Errorf("can't find item filter for metric with full name %q", metricFullName) + } + + return itemFilter, nil +} + +func (f *itemFilterFactory) Shutdown() error { + for _, itemFilter := range f.filterByMetric { + err := itemFilter.Shutdown() + if err != nil { + return err + } + } + + return nil +} diff --git a/receiver/googlecloudspannerreceiver/internal/filterfactory/itemfilterfactory_test.go b/receiver/googlecloudspannerreceiver/internal/filterfactory/itemfilterfactory_test.go new file mode 100644 index 000000000000..df4ebe659ac7 --- /dev/null +++ b/receiver/googlecloudspannerreceiver/internal/filterfactory/itemfilterfactory_test.go @@ -0,0 +1,152 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package filterfactory + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/filter" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/metadata" +) + +func TestNewItemFilterResolver(t *testing.T) { + logger := zaptest.NewLogger(t) + metricPrefixes := []string{prefix1, prefix2} + prefixHighCardinality := []bool{true, true} + metadataItems := generateMetadataItems(metricPrefixes, prefixHighCardinality) + testCases := map[string]struct { + totalLimit int + expectError bool + }{ + "Total limit is zero": {0, false}, + "Total limit is positive": {200 * defaultMetricDataPointsAmountInPeriod, false}, + "Total limit is lover then product of amounts": {3, true}, + "Error when limit by timestamp is lower than 1 for high cardinality groups": {20, true}, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + config := &ItemFilterFactoryConfig{ + MetadataItems: metadataItems, + TotalLimit: testCase.totalLimit, + ProjectAmount: 1, + InstanceAmount: 2, + DatabaseAmount: 5, + } + + factory, err := NewItemFilterResolver(logger, config) + + if testCase.expectError { + require.Error(t, err) + require.Nil(t, factory) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestItemFilterFactoryConfig_Validate(t *testing.T) { + testCases := map[string]struct { + metadataItems []*metadata.MetricsMetadata + totalLimit int + projectAmount int + instanceAmount int + databaseAmount int + expectError bool + }{ + "No metadata items": {[]*metadata.MetricsMetadata{}, 10, 1, 1, 1, true}, + "Total limit is zero": {[]*metadata.MetricsMetadata{{}}, 0, 1, 1, 1, false}, + "Total limit is lover then product of amounts": {[]*metadata.MetricsMetadata{{}}, 3, 1, 2, 3, true}, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + config := &ItemFilterFactoryConfig{ + MetadataItems: testCase.metadataItems, + TotalLimit: testCase.totalLimit, + ProjectAmount: testCase.projectAmount, + InstanceAmount: testCase.instanceAmount, + DatabaseAmount: testCase.databaseAmount, + } + + err := config.validate() + + if testCase.expectError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestItemFilterFactory_Resolve(t *testing.T) { + itemFilter := filter.NewNopItemCardinalityFilter() + testCases := map[string]struct { + filterByMetric map[string]filter.ItemFilter + expectError bool + }{ + "Filter cannot be resolved": {map[string]filter.ItemFilter{}, true}, + "Filter can be resolved": {map[string]filter.ItemFilter{metricFullName: itemFilter}, false}, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + factory := &itemFilterFactory{ + filterByMetric: testCase.filterByMetric, + } + + resolvedFilter, err := factory.Resolve(metricFullName) + + if testCase.expectError { + require.Error(t, err) + require.Nil(t, resolvedFilter) + } else { + require.NoError(t, err) + assert.Equal(t, itemFilter, resolvedFilter) + } + }) + } +} + +func TestItemFilterFactory_Shutdown(t *testing.T) { + testCases := map[string]struct { + expectedError error + }{ + "Error": {errors.New("error on shutdown")}, + "Happy path": {nil}, + } + + for name, testCase := range testCases { + mf := &mockFilter{} + t.Run(name, func(t *testing.T) { + factory := &itemFilterFactory{ + filterByMetric: map[string]filter.ItemFilter{metricFullName: mf}, + } + + mf.On("Shutdown").Return(testCase.expectedError) + + _ = factory.Shutdown() + + mf.AssertExpectations(t) + }) + } +} diff --git a/receiver/googlecloudspannerreceiver/internal/filterfactory/testhelpers_test.go b/receiver/googlecloudspannerreceiver/internal/filterfactory/testhelpers_test.go new file mode 100644 index 000000000000..bb4f295c81e6 --- /dev/null +++ b/receiver/googlecloudspannerreceiver/internal/filterfactory/testhelpers_test.go @@ -0,0 +1,68 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package filterfactory + +import ( + "github.com/stretchr/testify/mock" + "go.opentelemetry.io/collector/model/pdata" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/filter" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/metadata" +) + +const ( + metricFullName = "metricFullName" + prefix1 = "prefix1-" + prefix2 = "prefix2-" +) + +type mockFilter struct { + mock.Mock +} + +func (f *mockFilter) Filter(source []*filter.Item) ([]*filter.Item, error) { + return source, nil +} + +func (f *mockFilter) Shutdown() error { + args := f.Called() + return args.Error(0) +} + +func (f *mockFilter) TotalLimit() int { + return 0 +} + +func (f *mockFilter) LimitByTimestamp() int { + return 0 +} + +func generateMetadataItems(prefixes []string, prefixHighCardinality []bool) []*metadata.MetricsMetadata { + metricDataType := metadata.NewMetricDataType(pdata.MetricDataTypeGauge, pdata.MetricAggregationTemporalityUnspecified, false) + metadataItems := make([]*metadata.MetricsMetadata, len(prefixes)) + + for i, prefix := range prefixes { + metadataItems[i] = &metadata.MetricsMetadata{ + MetricNamePrefix: prefix, + HighCardinality: prefixHighCardinality[i], + QueryMetricValuesMetadata: []metadata.MetricValueMetadata{ + metadata.NewInt64MetricValueMetadata("int64", "int64Column", metricDataType, "int64Unit"), + metadata.NewFloat64MetricValueMetadata("float64", "float64Column", metricDataType, "float64Unit"), + }, + } + } + + return metadataItems +}