Skip to content

Commit

Permalink
Move datapoint handling back to funcs
Browse files Browse the repository at this point in the history
  • Loading branch information
TylerHelmuth committed Nov 16, 2022
1 parent a9a3a61 commit dc5531b
Showing 1 changed file with 79 additions and 46 deletions.
125 changes: 79 additions & 46 deletions processor/transformprocessor/internal/common/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package common // import "github.com/open-telemetry/opentelemetry-collector-cont

import (
"context"
"go.opentelemetry.io/collector/pdata/pcommon"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
Expand Down Expand Up @@ -76,62 +77,38 @@ func (d dataPointStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metr
smetrics.Metrics().RemoveIf(func(metric pmetric.Metric) bool {
switch metric.Type() {
case pmetric.MetricTypeSum:
metric.Sum().DataPoints().RemoveIf(func(datapoint pmetric.NumberDataPoint) bool {
tCtx := ottldatapoint.NewTransformContext(datapoint, metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource())
remove, err := executeStatements(ctx, tCtx, d)
if err != nil {
errors = multierr.Append(errors, err)
return false
}
return bool(remove)
})
err := d.handleNumberDataPoints(ctx, metric.Sum().DataPoints(), metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource())
if err != nil {
errors = multierr.Append(errors, err)
}
return metric.Sum().DataPoints().Len() == 0
case pmetric.MetricTypeGauge:
metric.Gauge().DataPoints().RemoveIf(func(datapoint pmetric.NumberDataPoint) bool {
tCtx := ottldatapoint.NewTransformContext(datapoint, metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource())
remove, err := executeStatements(ctx, tCtx, d)
if err != nil {
errors = multierr.Append(errors, err)
return false
}
return bool(remove)
})
err := d.handleNumberDataPoints(ctx, metric.Gauge().DataPoints(), metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource())
if err != nil {
errors = multierr.Append(errors, err)
}
return metric.Gauge().DataPoints().Len() == 0
case pmetric.MetricTypeHistogram:
metric.Histogram().DataPoints().RemoveIf(func(datapoint pmetric.HistogramDataPoint) bool {
tCtx := ottldatapoint.NewTransformContext(datapoint, metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource())
remove, err := executeStatements(ctx, tCtx, d)
if err != nil {
errors = multierr.Append(errors, err)
return false
}
return bool(remove)
})
err := d.handleHistogramDataPoints(ctx, metric.Histogram().DataPoints(), metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource())
if err != nil {
errors = multierr.Append(errors, err)
}
return metric.Histogram().DataPoints().Len() == 0
case pmetric.MetricTypeExponentialHistogram:
metric.ExponentialHistogram().DataPoints().RemoveIf(func(datapoint pmetric.ExponentialHistogramDataPoint) bool {
tCtx := ottldatapoint.NewTransformContext(datapoint, metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource())
remove, err := executeStatements(ctx, tCtx, d)
if err != nil {
errors = multierr.Append(errors, err)
return false
}
return bool(remove)
})
err := d.handleExponetialHistogramDataPoints(ctx, metric.ExponentialHistogram().DataPoints(), metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource())
if err != nil {
errors = multierr.Append(errors, err)
}
return metric.ExponentialHistogram().DataPoints().Len() == 0
case pmetric.MetricTypeSummary:
metric.Summary().DataPoints().RemoveIf(func(datapoint pmetric.SummaryDataPoint) bool {
tCtx := ottldatapoint.NewTransformContext(datapoint, metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource())
remove, err := executeStatements(ctx, tCtx, d)
if err != nil {
errors = multierr.Append(errors, err)
return false
}
return bool(remove)
})
err := d.handleSummaryDataPoints(ctx, metric.Summary().DataPoints(), metric, smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource())
if err != nil {
errors = multierr.Append(errors, err)
}
return metric.Summary().DataPoints().Len() == 0
default:
return false
}
return false
})
return smetrics.Metrics().Len() == 0
})
Expand All @@ -140,6 +117,62 @@ func (d dataPointStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metr
return errors
}

func (d dataPointStatements) handleNumberDataPoints(ctx context.Context, dps pmetric.NumberDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error {
var errors error
dps.RemoveIf(func(datapoint pmetric.NumberDataPoint) bool {
tCtx := ottldatapoint.NewTransformContext(datapoint, metric, metrics, is, resource)
remove, err := executeStatements(ctx, tCtx, d)
if err != nil {
errors = multierr.Append(errors, err)
return false
}
return bool(remove)
})
return errors
}

func (d dataPointStatements) handleHistogramDataPoints(ctx context.Context, dps pmetric.HistogramDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error {
var errors error
dps.RemoveIf(func(datapoint pmetric.HistogramDataPoint) bool {
tCtx := ottldatapoint.NewTransformContext(datapoint, metric, metrics, is, resource)
remove, err := executeStatements(ctx, tCtx, d)
if err != nil {
errors = multierr.Append(errors, err)
return false
}
return bool(remove)
})
return errors
}

func (d dataPointStatements) handleExponetialHistogramDataPoints(ctx context.Context, dps pmetric.ExponentialHistogramDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error {
var errors error
dps.RemoveIf(func(datapoint pmetric.ExponentialHistogramDataPoint) bool {
tCtx := ottldatapoint.NewTransformContext(datapoint, metric, metrics, is, resource)
remove, err := executeStatements(ctx, tCtx, d)
if err != nil {
errors = multierr.Append(errors, err)
return false
}
return bool(remove)
})
return errors
}

func (d dataPointStatements) handleSummaryDataPoints(ctx context.Context, dps pmetric.SummaryDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error {
var errors error
dps.RemoveIf(func(datapoint pmetric.SummaryDataPoint) bool {
tCtx := ottldatapoint.NewTransformContext(datapoint, metric, metrics, is, resource)
remove, err := executeStatements(ctx, tCtx, d)
if err != nil {
errors = multierr.Append(errors, err)
return false
}
return bool(remove)
})
return errors
}

type MetricParserCollection struct {
parserCollection
metricParser ottl.Parser[ottlmetric.TransformContext]
Expand Down

0 comments on commit dc5531b

Please sign in to comment.