Skip to content

Commit

Permalink
Add support for filterprocessor telemetry from logs, traces
Browse files Browse the repository at this point in the history
  • Loading branch information
MacroPower committed Nov 19, 2023
1 parent 0b6cbe1 commit e6497ac
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 14 deletions.
2 changes: 1 addition & 1 deletion processor/filterprocessor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (fmp *filterMetricProcessor) processMetrics(ctx context.Context, md pmetric
})

metricCountAfterFilters := md.MetricCount()
fmp.telemetry.record(int64(metricCountBeforeFilters - metricCountAfterFilters))
fmp.telemetry.record(triggerMetricsDropped, int64(metricCountBeforeFilters-metricCountAfterFilters))

if errors != nil {
fmp.logger.Error("failed processing metrics", zap.Error(errors))
Expand Down
84 changes: 71 additions & 13 deletions processor/filterprocessor/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,24 @@ import (
const (
scopeName = "go.opentelemetry.io/collector/processor/filterprocessor"

metricFilteredDesc = "Number of metrics dropped by the filter processor"
metricsFilteredDesc = "Number of metrics dropped by the filter processor"
logsFilteredDesc = "Number of logs dropped by the filter processor"
tracesFilteredDesc = "Number of traces dropped by the filter processor"
)

type trigger int

const (
triggerMetricsDropped trigger = iota
triggerLogsDropped
triggerTracesDropped
)

var (
processorTagKey = tag.MustNewKey(typeStr)
statMetricsFiltered = stats.Int64("metrics.filtered", metricFilteredDesc, stats.UnitDimensionless)
statMetricsFiltered = stats.Int64("metrics.filtered", metricsFilteredDesc, stats.UnitDimensionless)
statLogsFiltered = stats.Int64("logs.filtered", logsFilteredDesc, stats.UnitDimensionless)
statTracesFiltered = stats.Int64("traces.filtered", tracesFilteredDesc, stats.UnitDimensionless)
)

func init() {
Expand All @@ -43,6 +55,20 @@ func metricViews() []*view.View {
Aggregation: view.Count(),
TagKeys: processorTagKeys,
},
{
Name: statLogsFiltered.Name(),
Measure: statLogsFiltered,
Description: statLogsFiltered.Description(),
Aggregation: view.Count(),
TagKeys: processorTagKeys,
},
{
Name: statTracesFiltered.Name(),
Measure: statTracesFiltered,
Description: statTracesFiltered.Description(),
Aggregation: view.Count(),
TagKeys: processorTagKeys,
},
}
}

Expand All @@ -51,8 +77,11 @@ type filterProcessorTelemetry struct {

exportCtx context.Context

processorAttr []attribute.KeyValue
droppedByFilter metric.Int64Counter
processorAttr []attribute.KeyValue

metricsFiltered metric.Int64Counter
logsFiltered metric.Int64Counter
tracesFiltered metric.Int64Counter
}

func newfilterProcessorTelemetry(set processor.CreateSettings) (*filterProcessorTelemetry, error) {
Expand Down Expand Up @@ -84,28 +113,57 @@ func (fpt *filterProcessorTelemetry) createOtelMetrics(mp metric.MeterProvider)
var errors, err error
meter := mp.Meter(scopeName)

fpt.droppedByFilter, err = meter.Int64Counter(
fpt.metricsFiltered, err = meter.Int64Counter(
processorhelper.BuildCustomMetricName(typeStr, "metrics_filtered"),
metric.WithDescription(metricFilteredDesc),
metric.WithDescription(metricsFilteredDesc),
metric.WithUnit("1"),
)
errors = multierr.Append(errors, err)
fpt.logsFiltered, err = meter.Int64Counter(
processorhelper.BuildCustomMetricName(typeStr, "logs_filtered"),
metric.WithDescription(logsFilteredDesc),
metric.WithUnit("1"),
)
errors = multierr.Append(errors, err)
fpt.tracesFiltered, err = meter.Int64Counter(
processorhelper.BuildCustomMetricName(typeStr, "traces_filtered"),
metric.WithDescription(tracesFilteredDesc),
metric.WithUnit("1"),
)
errors = multierr.Append(errors, err)

return errors
}

func (fpt *filterProcessorTelemetry) record(dropped int64) {
func (fpt *filterProcessorTelemetry) record(trigger trigger, dropped int64) {
if fpt.useOtel {
fpt.recordWithOtel(dropped)
fpt.recordWithOtel(trigger, dropped)
} else {
fpt.recordWithOC(dropped)
fpt.recordWithOC(trigger, dropped)
}
}

func (fpt *filterProcessorTelemetry) recordWithOC(dropped int64) {
stats.Record(fpt.exportCtx, statMetricsFiltered.M(dropped))
func (fpt *filterProcessorTelemetry) recordWithOC(trigger trigger, dropped int64) {
var triggerMeasure *stats.Int64Measure
switch trigger {
case triggerMetricsDropped:
triggerMeasure = statMetricsFiltered
case triggerLogsDropped:
triggerMeasure = statLogsFiltered
case triggerTracesDropped:
triggerMeasure = statTracesFiltered
}

stats.Record(fpt.exportCtx, triggerMeasure.M(dropped))
}

func (fpt *filterProcessorTelemetry) recordWithOtel(dropped int64) {
fpt.droppedByFilter.Add(fpt.exportCtx, dropped, metric.WithAttributes(fpt.processorAttr...))
func (fpt *filterProcessorTelemetry) recordWithOtel(trigger trigger, dropped int64) {
switch trigger {
case triggerMetricsDropped:
fpt.metricsFiltered.Add(fpt.exportCtx, dropped, metric.WithAttributes(fpt.processorAttr...))
case triggerLogsDropped:
fpt.logsFiltered.Add(fpt.exportCtx, dropped, metric.WithAttributes(fpt.processorAttr...))
case triggerTracesDropped:
fpt.tracesFiltered.Add(fpt.exportCtx, dropped, metric.WithAttributes(fpt.processorAttr...))
}
}

0 comments on commit e6497ac

Please sign in to comment.