From dc5531be8b512b0d1002da62f47473f213535f6f Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Wed, 16 Nov 2022 09:52:45 -0700 Subject: [PATCH] Move datapoint handling back to funcs --- .../internal/common/metrics.go | 125 +++++++++++------- 1 file changed, 79 insertions(+), 46 deletions(-) diff --git a/processor/transformprocessor/internal/common/metrics.go b/processor/transformprocessor/internal/common/metrics.go index 8e6b71c9ef92..3e3ec43b9f34 100644 --- a/processor/transformprocessor/internal/common/metrics.go +++ b/processor/transformprocessor/internal/common/metrics.go @@ -16,6 +16,7 @@ package common // import "github.com/open-telemetry/opentelemetry-collector-cont import ( "context" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" @@ -76,62 +77,38 @@ func (d dataPointStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metr smetrics.Metrics().RemoveIf(func(metric pmetric.Metric) bool { switch metric.Type() { case pmetric.MetricTypeSum: - metric.Sum().DataPoints().RemoveIf(func(datapoint pmetric.NumberDataPoint) bool { - tCtx := ottldatapoint.NewTransformContext(datapoint, metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource()) - remove, err := executeStatements(ctx, tCtx, d) - if err != nil { - errors = multierr.Append(errors, err) - return false - } - return bool(remove) - }) + err := d.handleNumberDataPoints(ctx, metric.Sum().DataPoints(), metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource()) + if err != nil { + errors = multierr.Append(errors, err) + } return metric.Sum().DataPoints().Len() == 0 case pmetric.MetricTypeGauge: - metric.Gauge().DataPoints().RemoveIf(func(datapoint pmetric.NumberDataPoint) bool { - tCtx := ottldatapoint.NewTransformContext(datapoint, metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource()) - remove, err := executeStatements(ctx, tCtx, d) - if err != nil { - errors = multierr.Append(errors, err) - return false - } - return bool(remove) - }) + err := d.handleNumberDataPoints(ctx, metric.Gauge().DataPoints(), metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource()) + if err != nil { + errors = multierr.Append(errors, err) + } return metric.Gauge().DataPoints().Len() == 0 case pmetric.MetricTypeHistogram: - metric.Histogram().DataPoints().RemoveIf(func(datapoint pmetric.HistogramDataPoint) bool { - tCtx := ottldatapoint.NewTransformContext(datapoint, metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource()) - remove, err := executeStatements(ctx, tCtx, d) - if err != nil { - errors = multierr.Append(errors, err) - return false - } - return bool(remove) - }) + err := d.handleHistogramDataPoints(ctx, metric.Histogram().DataPoints(), metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource()) + if err != nil { + errors = multierr.Append(errors, err) + } return metric.Histogram().DataPoints().Len() == 0 case pmetric.MetricTypeExponentialHistogram: - metric.ExponentialHistogram().DataPoints().RemoveIf(func(datapoint pmetric.ExponentialHistogramDataPoint) bool { - tCtx := ottldatapoint.NewTransformContext(datapoint, metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource()) - remove, err := executeStatements(ctx, tCtx, d) - if err != nil { - errors = multierr.Append(errors, err) - return false - } - return bool(remove) - }) + err := d.handleExponetialHistogramDataPoints(ctx, metric.ExponentialHistogram().DataPoints(), metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource()) + if err != nil { + errors = multierr.Append(errors, err) + } return metric.ExponentialHistogram().DataPoints().Len() == 0 case pmetric.MetricTypeSummary: - metric.Summary().DataPoints().RemoveIf(func(datapoint pmetric.SummaryDataPoint) bool { - tCtx := ottldatapoint.NewTransformContext(datapoint, metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource()) - remove, err := executeStatements(ctx, tCtx, d) - if err != nil { - errors = multierr.Append(errors, err) - return false - } - return bool(remove) - }) + err := d.handleSummaryDataPoints(ctx, metric.Summary().DataPoints(), metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource()) + if err != nil { + errors = multierr.Append(errors, err) + } return metric.Summary().DataPoints().Len() == 0 + default: + return false } - return false }) return smetrics.Metrics().Len() == 0 }) @@ -140,6 +117,62 @@ func (d dataPointStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metr return errors } +func (d dataPointStatements) handleNumberDataPoints(ctx context.Context, dps pmetric.NumberDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { + var errors error + dps.RemoveIf(func(datapoint pmetric.NumberDataPoint) bool { + tCtx := ottldatapoint.NewTransformContext(datapoint, metric, metrics, is, resource) + remove, err := executeStatements(ctx, tCtx, d) + if err != nil { + errors = multierr.Append(errors, err) + return false + } + return bool(remove) + }) + return errors +} + +func (d dataPointStatements) handleHistogramDataPoints(ctx context.Context, dps pmetric.HistogramDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { + var errors error + dps.RemoveIf(func(datapoint pmetric.HistogramDataPoint) bool { + tCtx := ottldatapoint.NewTransformContext(datapoint, metric, metrics, is, resource) + remove, err := executeStatements(ctx, tCtx, d) + if err != nil { + errors = multierr.Append(errors, err) + return false + } + return bool(remove) + }) + return errors +} + +func (d dataPointStatements) handleExponetialHistogramDataPoints(ctx context.Context, dps pmetric.ExponentialHistogramDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { + var errors error + dps.RemoveIf(func(datapoint pmetric.ExponentialHistogramDataPoint) bool { + tCtx := ottldatapoint.NewTransformContext(datapoint, metric, metrics, is, resource) + remove, err := executeStatements(ctx, tCtx, d) + if err != nil { + errors = multierr.Append(errors, err) + return false + } + return bool(remove) + }) + return errors +} + +func (d dataPointStatements) handleSummaryDataPoints(ctx context.Context, dps pmetric.SummaryDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { + var errors error + dps.RemoveIf(func(datapoint pmetric.SummaryDataPoint) bool { + tCtx := ottldatapoint.NewTransformContext(datapoint, metric, metrics, is, resource) + remove, err := executeStatements(ctx, tCtx, d) + if err != nil { + errors = multierr.Append(errors, err) + return false + } + return bool(remove) + }) + return errors +} + type MetricParserCollection struct { parserCollection metricParser ottl.Parser[ottlmetric.TransformContext]