Skip to content

Commit

Permalink
Added metrics cardinality handling for Google Cloud Spanner receiver …
Browse files Browse the repository at this point in the history
…- part 2.

This part contains item filter factory which is responsible for calculation of cardinality limits for metrics and construction of item filters for each metric.
  • Loading branch information
ydrozhdzhal committed Nov 10, 2021
1 parent cff02cc commit 0c7b16e
Show file tree
Hide file tree
Showing 7 changed files with 749 additions and 0 deletions.
1 change: 1 addition & 0 deletions receiver/googlecloudspannerreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.6.1 // indirect
github.com/spf13/cast v1.4.1 // indirect
github.com/stretchr/objx v0.1.0 // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/otel v1.1.0 // indirect
go.opentelemetry.io/otel/metric v0.24.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions receiver/googlecloudspannerreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package filterfactory

import (
"errors"

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/filter"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/metadata"
)

type filterBuilder struct {
logger *zap.Logger
config *ItemFilterFactoryConfig
}

func (b filterBuilder) buildFilterByMetricZeroTotalLimit() map[string]filter.ItemFilter {
filterByMetric := make(map[string]filter.ItemFilter)
nopFilter := filter.NewNopItemCardinalityFilter()

for _, metadataItem := range b.config.MetadataItems {
for _, metricValueMetadata := range metadataItem.QueryMetricValuesMetadata {
metricFullName := metadataItem.MetricNamePrefix + metricValueMetadata.Name()
filterByMetric[metricFullName] = nopFilter
}
}

return filterByMetric
}

func (b filterBuilder) buildFilterByMetricPositiveTotalLimit() (map[string]filter.ItemFilter, error) {
filterByMetric := make(map[string]filter.ItemFilter)
groupedItems := groupByCardinality(b.config.MetadataItems)

// Handle metric groups with low cardinality
lowCardinalityGroups := groupedItems[false]
newTotalLimit, err := b.handleLowCardinalityGroups(lowCardinalityGroups, b.config.TotalLimit, filterByMetric)
if err != nil {
return nil, err
}

// Handle metric groups with high cardinality
highCardinalityGroups := groupedItems[true]
newTotalLimit, err = b.handleHighCardinalityGroups(highCardinalityGroups, newTotalLimit, filterByMetric)
if err != nil {
return nil, err
}

b.logger.Debug("Remaining total limit after cardinality limits calculation",
zap.Int("remainingTotalLimit", newTotalLimit))

return filterByMetric, nil
}

func (b filterBuilder) handleLowCardinalityGroups(groups []*metadata.MetricsMetadata, remainingTotalLimit int,
filterByMetric map[string]filter.ItemFilter) (int, error) {

if len(groups) == 0 {
return remainingTotalLimit, nil
}

limitPerMetricByTimestamp := b.config.ProjectAmount * b.config.InstanceAmount * b.config.DatabaseAmount

// For low cardinality metrics total limit is equal to limit by timestamp
b.logger.Debug("Calculated cardinality limits for low cardinality metric group",
zap.Int("limitPerMetricByTimestamp", limitPerMetricByTimestamp))

return b.constructFiltersForGroups(limitPerMetricByTimestamp, limitPerMetricByTimestamp, groups, remainingTotalLimit, filterByMetric)
}

func (b filterBuilder) handleHighCardinalityGroups(groups []*metadata.MetricsMetadata, remainingTotalLimit int,
filterByMetric map[string]filter.ItemFilter) (int, error) {

if len(groups) == 0 {
return remainingTotalLimit, nil
}

totalLimitPerMetric := remainingTotalLimit / countMetricsInGroups(groups)
limitPerMetricByTimestamp := totalLimitPerMetric / defaultMetricDataPointsAmountInPeriod

b.logger.Debug("Calculated cardinality limits for high cardinality metric group",
zap.Int("limitPerMetricByTimestamp", limitPerMetricByTimestamp),
zap.Int("totalLimitPerMetric", totalLimitPerMetric))

if limitPerMetricByTimestamp < 1 {
return remainingTotalLimit, errors.New("limit per metric per timestamp for high cardinality metrics is lower than 1")
}

return b.constructFiltersForGroups(totalLimitPerMetric, limitPerMetricByTimestamp, groups, remainingTotalLimit, filterByMetric)
}

func (b filterBuilder) constructFiltersForGroups(totalLimitPerMetric int, limitPerMetricByTimestamp int,
groups []*metadata.MetricsMetadata, remainingTotalLimit int, filterByMetric map[string]filter.ItemFilter) (int, error) {

newTotalLimit := remainingTotalLimit

for _, metadataItem := range groups {
for _, metricValueMetadata := range metadataItem.QueryMetricValuesMetadata {
newTotalLimit -= totalLimitPerMetric
metricFullName := metadataItem.MetricNamePrefix + metricValueMetadata.Name()

b.logger.Debug("Setting cardinality limits for metric",
zap.String("metricFullName", metricFullName),
zap.Int("limitPerMetricByTimestamp", limitPerMetricByTimestamp),
zap.Int("totalLimitPerMetric", totalLimitPerMetric),
zap.Int("remainingTotalLimit", newTotalLimit))

itemFilter, err := filter.NewItemCardinalityFilter(metricFullName, totalLimitPerMetric,
limitPerMetricByTimestamp, defaultItemActivityPeriod, b.logger)
if err != nil {
return remainingTotalLimit, err
}
filterByMetric[metricFullName] = itemFilter
}
}

return newTotalLimit, nil
}

func countMetricsInGroups(metadataItems []*metadata.MetricsMetadata) (amount int) {
for _, metadataItem := range metadataItems {
amount += len(metadataItem.QueryMetricValuesMetadata)
}

return amount
}

func groupByCardinality(metadataItems []*metadata.MetricsMetadata) map[bool][]*metadata.MetricsMetadata {
groupedItems := make(map[bool][]*metadata.MetricsMetadata)

for _, metadataItem := range metadataItems {
groupedItems[metadataItem.HighCardinality] = append(groupedItems[metadataItem.HighCardinality], metadataItem)
}

return groupedItems
}
Loading

0 comments on commit 0c7b16e

Please sign in to comment.