Skip to content

Commit

Permalink
Add initial telemetry for filterprocessor metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
MacroPower committed Nov 9, 2023
1 parent eb320a1 commit 2a7e764
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 0 deletions.
5 changes: 5 additions & 0 deletions processor/filterprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor/internal/metadata"
)

const (
// The value of "type" key in configuration.
typeStr = "filter"
)

var processorCapabilities = consumer.Capabilities{MutatesData: true}

// NewFactory returns a new factory for the Filter processor.
Expand Down
12 changes: 12 additions & 0 deletions processor/filterprocessor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package filterprocessor // import "github.com/open-telemetry/opentelemetry-colle

import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand All @@ -28,6 +29,7 @@ type filterMetricProcessor struct {
skipResourceExpr expr.BoolExpr[ottlresource.TransformContext]
skipMetricExpr expr.BoolExpr[ottlmetric.TransformContext]
skipDataPointExpr expr.BoolExpr[ottldatapoint.TransformContext]
telemetry *filterProcessorTelemetry
logger *zap.Logger
}

Expand All @@ -36,6 +38,13 @@ func newFilterMetricProcessor(set component.TelemetrySettings, cfg *Config) (*fi
fsp := &filterMetricProcessor{
logger: set.Logger,
}

fpt, err := newfilterProcessorTelemetry(set)
if err != nil {
return nil, fmt.Errorf("error creating batch processor telemetry: %w", err)
}
fsp.telemetry = fpt

if cfg.Metrics.MetricConditions != nil || cfg.Metrics.DataPointConditions != nil {
if cfg.Metrics.MetricConditions != nil {
fsp.skipMetricExpr, err = filterottl.NewBoolExprForMetric(cfg.Metrics.MetricConditions, filterottl.StandardMetricFuncs(), cfg.ErrorMode, set)
Expand Down Expand Up @@ -107,6 +116,8 @@ func (fmp *filterMetricProcessor) processMetrics(ctx context.Context, md pmetric
return md, nil
}

totalMetrics := md.ResourceMetrics().Len()

var errors error
md.ResourceMetrics().RemoveIf(func(rmetrics pmetric.ResourceMetrics) bool {
resource := rmetrics.Resource()
Expand Down Expand Up @@ -168,6 +179,7 @@ func (fmp *filterMetricProcessor) processMetrics(ctx context.Context, md pmetric
if md.ResourceMetrics().Len() == 0 {
return md, processorhelper.ErrSkipProcessingData
}
fmp.telemetry.record(int64(totalMetrics-md.ResourceMetrics().Len()), int64(totalMetrics))
return md, nil
}

Expand Down
127 changes: 127 additions & 0 deletions processor/filterprocessor/telemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package filterprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor"

import (
"context"

"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/multierr"
)

const (
processorKey = "filter_processor"
scopeName = "go.opentelemetry.io/collector/processor/filterprocessor"
)

var (
processorTagKey = tag.MustNewKey(processorKey)
statMetricsFiltered = stats.Int64("metrics_filtered", "Number of metrics dropped by the filter processor", stats.UnitDimensionless)
statMetricsProcessed = stats.Int64("metrics_processed", "Number of metrics processed by the filter processor", stats.UnitDimensionless)
)

func init() {
// TODO: Find a way to handle the error.
_ = view.Register(metricViews()...)
}

func metricViews() []*view.View {
processorTagKeys := []tag.Key{processorTagKey}

return []*view.View{
{
Name: statMetricsFiltered.Name(),
Measure: statMetricsFiltered,
Description: statMetricsFiltered.Description(),
Aggregation: view.Count(),
TagKeys: processorTagKeys,
},
{
Name: statMetricsProcessed.Name(),
Measure: statMetricsProcessed,
Description: statMetricsProcessed.Description(),
Aggregation: view.Count(),
TagKeys: processorTagKeys,
},
}
}

type filterProcessorTelemetry struct {
useOtel bool

exportCtx context.Context

processorAttr []attribute.KeyValue
droppedByFilter metric.Int64Counter
processedByFilter metric.Int64Counter
}

func newfilterProcessorTelemetry(set component.TelemetrySettings) (*filterProcessorTelemetry, error) {
id, _ := set.Resource.Attributes().Get("ID")

exportCtx, err := tag.New(context.Background(), tag.Insert(processorTagKey, id.Str()))
if err != nil {
return nil, err
}

fpt := &filterProcessorTelemetry{
useOtel: false,
processorAttr: []attribute.KeyValue{attribute.String(processorKey, id.Str())},
exportCtx: exportCtx,
}

if err = fpt.createOtelMetrics(set.MeterProvider); err != nil {
return nil, err
}

return fpt, nil
}

func (fpt *filterProcessorTelemetry) createOtelMetrics(mp metric.MeterProvider) error {
if !fpt.useOtel {
return nil
}

var errors, err error
meter := mp.Meter(scopeName)

fpt.droppedByFilter, err = meter.Int64Counter(
processorhelper.BuildCustomMetricName(typeStr, "metrics_filtered"),
metric.WithDescription("Number of metrics dropped by the filter processor"),
metric.WithUnit("1"),
)
errors = multierr.Append(errors, err)
fpt.processedByFilter, err = meter.Int64Counter(
processorhelper.BuildCustomMetricName(typeStr, "metrics_processed"),
metric.WithDescription("Number of metrics processed by the filter processor"),
metric.WithUnit("1"),
)
errors = multierr.Append(errors, err)

return errors
}

func (fpt *filterProcessorTelemetry) record(dropped, total int64) {
if fpt.useOtel {
fpt.recordWithOtel(dropped, total)
} else {
fpt.recordWithOC(dropped, total)
}
}

func (fpt *filterProcessorTelemetry) recordWithOC(dropped, total int64) {
stats.Record(fpt.exportCtx, statMetricsFiltered.M(dropped))
stats.Record(fpt.exportCtx, statMetricsProcessed.M(total))
}

func (fpt *filterProcessorTelemetry) recordWithOtel(dropped, total int64) {
fpt.droppedByFilter.Add(fpt.exportCtx, dropped, metric.WithAttributes(fpt.processorAttr...))
fpt.processedByFilter.Add(fpt.exportCtx, total, metric.WithAttributes(fpt.processorAttr...))
}

0 comments on commit 2a7e764

Please sign in to comment.