From 2a7e7646b11cf307d5c8b204a391fc64fce83cd4 Mon Sep 17 00:00:00 2001 From: Jacob Colvin Date: Thu, 9 Nov 2023 00:06:37 -0600 Subject: [PATCH] Add initial telemetry for filterprocessor metrics --- processor/filterprocessor/factory.go | 5 + processor/filterprocessor/metrics.go | 12 +++ processor/filterprocessor/telemetry.go | 127 +++++++++++++++++++++++++ 3 files changed, 144 insertions(+) create mode 100644 processor/filterprocessor/telemetry.go diff --git a/processor/filterprocessor/factory.go b/processor/filterprocessor/factory.go index bf06c4548fc9..b75278a48ef5 100644 --- a/processor/filterprocessor/factory.go +++ b/processor/filterprocessor/factory.go @@ -15,6 +15,11 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor/internal/metadata" ) +const ( + // The value of "type" key in configuration. + typeStr = "filter" +) + var processorCapabilities = consumer.Capabilities{MutatesData: true} // NewFactory returns a new factory for the Filter processor. diff --git a/processor/filterprocessor/metrics.go b/processor/filterprocessor/metrics.go index 31fae0dfad5b..afe95df9ae86 100644 --- a/processor/filterprocessor/metrics.go +++ b/processor/filterprocessor/metrics.go @@ -5,6 +5,7 @@ package filterprocessor // import "github.com/open-telemetry/opentelemetry-colle import ( "context" + "fmt" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" @@ -28,6 +29,7 @@ type filterMetricProcessor struct { skipResourceExpr expr.BoolExpr[ottlresource.TransformContext] skipMetricExpr expr.BoolExpr[ottlmetric.TransformContext] skipDataPointExpr expr.BoolExpr[ottldatapoint.TransformContext] + telemetry *filterProcessorTelemetry logger *zap.Logger } @@ -36,6 +38,13 @@ func newFilterMetricProcessor(set component.TelemetrySettings, cfg *Config) (*fi fsp := &filterMetricProcessor{ logger: set.Logger, } + + fpt, err := newfilterProcessorTelemetry(set) + if err != nil { + return nil, fmt.Errorf("error creating batch processor telemetry: %w", err) + } + fsp.telemetry = fpt + if cfg.Metrics.MetricConditions != nil || cfg.Metrics.DataPointConditions != nil { if cfg.Metrics.MetricConditions != nil { fsp.skipMetricExpr, err = filterottl.NewBoolExprForMetric(cfg.Metrics.MetricConditions, filterottl.StandardMetricFuncs(), cfg.ErrorMode, set) @@ -107,6 +116,8 @@ func (fmp *filterMetricProcessor) processMetrics(ctx context.Context, md pmetric return md, nil } + totalMetrics := md.ResourceMetrics().Len() + var errors error md.ResourceMetrics().RemoveIf(func(rmetrics pmetric.ResourceMetrics) bool { resource := rmetrics.Resource() @@ -168,6 +179,7 @@ func (fmp *filterMetricProcessor) processMetrics(ctx context.Context, md pmetric if md.ResourceMetrics().Len() == 0 { return md, processorhelper.ErrSkipProcessingData } + fmp.telemetry.record(int64(totalMetrics-md.ResourceMetrics().Len()), int64(totalMetrics)) return md, nil } diff --git a/processor/filterprocessor/telemetry.go b/processor/filterprocessor/telemetry.go new file mode 100644 index 000000000000..58318df405f0 --- /dev/null +++ b/processor/filterprocessor/telemetry.go @@ -0,0 +1,127 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package filterprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor" + +import ( + "context" + + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/processor/processorhelper" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.uber.org/multierr" +) + +const ( + processorKey = "filter_processor" + scopeName = "go.opentelemetry.io/collector/processor/filterprocessor" +) + +var ( + processorTagKey = tag.MustNewKey(processorKey) + statMetricsFiltered = stats.Int64("metrics_filtered", "Number of metrics dropped by the filter processor", stats.UnitDimensionless) + statMetricsProcessed = stats.Int64("metrics_processed", "Number of metrics processed by the filter processor", stats.UnitDimensionless) +) + +func init() { + // TODO: Find a way to handle the error. + _ = view.Register(metricViews()...) +} + +func metricViews() []*view.View { + processorTagKeys := []tag.Key{processorTagKey} + + return []*view.View{ + { + Name: statMetricsFiltered.Name(), + Measure: statMetricsFiltered, + Description: statMetricsFiltered.Description(), + Aggregation: view.Count(), + TagKeys: processorTagKeys, + }, + { + Name: statMetricsProcessed.Name(), + Measure: statMetricsProcessed, + Description: statMetricsProcessed.Description(), + Aggregation: view.Count(), + TagKeys: processorTagKeys, + }, + } +} + +type filterProcessorTelemetry struct { + useOtel bool + + exportCtx context.Context + + processorAttr []attribute.KeyValue + droppedByFilter metric.Int64Counter + processedByFilter metric.Int64Counter +} + +func newfilterProcessorTelemetry(set component.TelemetrySettings) (*filterProcessorTelemetry, error) { + id, _ := set.Resource.Attributes().Get("ID") + + exportCtx, err := tag.New(context.Background(), tag.Insert(processorTagKey, id.Str())) + if err != nil { + return nil, err + } + + fpt := &filterProcessorTelemetry{ + useOtel: false, + processorAttr: []attribute.KeyValue{attribute.String(processorKey, id.Str())}, + exportCtx: exportCtx, + } + + if err = fpt.createOtelMetrics(set.MeterProvider); err != nil { + return nil, err + } + + return fpt, nil +} + +func (fpt *filterProcessorTelemetry) createOtelMetrics(mp metric.MeterProvider) error { + if !fpt.useOtel { + return nil + } + + var errors, err error + meter := mp.Meter(scopeName) + + fpt.droppedByFilter, err = meter.Int64Counter( + processorhelper.BuildCustomMetricName(typeStr, "metrics_filtered"), + metric.WithDescription("Number of metrics dropped by the filter processor"), + metric.WithUnit("1"), + ) + errors = multierr.Append(errors, err) + fpt.processedByFilter, err = meter.Int64Counter( + processorhelper.BuildCustomMetricName(typeStr, "metrics_processed"), + metric.WithDescription("Number of metrics processed by the filter processor"), + metric.WithUnit("1"), + ) + errors = multierr.Append(errors, err) + + return errors +} + +func (fpt *filterProcessorTelemetry) record(dropped, total int64) { + if fpt.useOtel { + fpt.recordWithOtel(dropped, total) + } else { + fpt.recordWithOC(dropped, total) + } +} + +func (fpt *filterProcessorTelemetry) recordWithOC(dropped, total int64) { + stats.Record(fpt.exportCtx, statMetricsFiltered.M(dropped)) + stats.Record(fpt.exportCtx, statMetricsProcessed.M(total)) +} + +func (fpt *filterProcessorTelemetry) recordWithOtel(dropped, total int64) { + fpt.droppedByFilter.Add(fpt.exportCtx, dropped, metric.WithAttributes(fpt.processorAttr...)) + fpt.processedByFilter.Add(fpt.exportCtx, total, metric.WithAttributes(fpt.processorAttr...)) +}