Skip to content

Commit

Permalink
Instrument obsreport.Processor (#6607)
Browse files Browse the repository at this point in the history
* Instrument obsreport.Processor

* add processor promchecker tests

* add chloggen

* run make genpdata and fix linter errors

* address review feedback and retest

* pass cfg into createOtelMetrics
  • Loading branch information
moh-osman3 authored Nov 28, 2022
1 parent 30cac33 commit 4ff1ff3
Show file tree
Hide file tree
Showing 8 changed files with 371 additions and 154 deletions.
11 changes: 11 additions & 0 deletions .chloggen/obsreport-processor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: obsreport

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Instrument `obsreport.Processor` metrics with otel-go"

# One or more tracking issues or pull requests related to the change
issues: [6607]
263 changes: 187 additions & 76 deletions obsreport/obsreport_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,25 @@ import (

"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
"go.opentelemetry.io/otel/metric/unit"
"go.uber.org/multierr"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/internal/obsreportconfig"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
)

var (
processorName = "processor"
processorScope = scopeName + nameSep + processorName
)

// BuildProcessorCustomMetricName is used to be build a metric name following
// the standards used in the Collector. The configType should be the same
// value used to identify the type on the config.
Expand All @@ -44,6 +57,21 @@ func BuildProcessorCustomMetricName(configType, metric string) string {
type Processor struct {
level configtelemetry.Level
mutators []tag.Mutator

logger *zap.Logger

useOtelForMetrics bool
otelAttrs []attribute.KeyValue

acceptedSpansCounter syncint64.Counter
refusedSpansCounter syncint64.Counter
droppedSpansCounter syncint64.Counter
acceptedMetricPointsCounter syncint64.Counter
refusedMetricPointsCounter syncint64.Counter
droppedMetricPointsCounter syncint64.Counter
acceptedLogRecordsCounter syncint64.Counter
refusedLogRecordsCounter syncint64.Counter
droppedLogRecordsCounter syncint64.Counter
}

// ProcessorSettings are settings for creating a Processor.
Expand All @@ -54,134 +82,217 @@ type ProcessorSettings struct {

// NewProcessor creates a new Processor.
func NewProcessor(cfg ProcessorSettings) (*Processor, error) {
return &Processor{
level: cfg.ProcessorCreateSettings.MetricsLevel,
mutators: []tag.Mutator{tag.Upsert(obsmetrics.TagKeyProcessor, cfg.ProcessorID.String(), tag.WithTTL(tag.TTLNoPropagation))},
}, nil
return newProcessor(cfg, featuregate.GetRegistry())
}

func newProcessor(cfg ProcessorSettings, registry *featuregate.Registry) (*Processor, error) {
proc := &Processor{
level: cfg.ProcessorCreateSettings.MetricsLevel,
mutators: []tag.Mutator{tag.Upsert(obsmetrics.TagKeyProcessor, cfg.ProcessorID.String(), tag.WithTTL(tag.TTLNoPropagation))},
logger: cfg.ProcessorCreateSettings.Logger,
useOtelForMetrics: registry.IsEnabled(obsreportconfig.UseOtelForInternalMetricsfeatureGateID),
otelAttrs: []attribute.KeyValue{
attribute.String(obsmetrics.ProcessorKey, cfg.ProcessorID.String()),
},
}

if err := proc.createOtelMetrics(cfg); err != nil {
return nil, err
}

return proc, nil
}

func (por *Processor) createOtelMetrics(cfg ProcessorSettings) error {
if !por.useOtelForMetrics {
return nil
}
meter := cfg.ProcessorCreateSettings.MeterProvider.Meter(processorScope)
var errors, err error

por.acceptedSpansCounter, err = meter.SyncInt64().Counter(
obsmetrics.ProcessorPrefix+obsmetrics.AcceptedSpansKey,
instrument.WithDescription("Number of spans successfully pushed into the next component in the pipeline."),
instrument.WithUnit(unit.Dimensionless),
)
errors = multierr.Append(errors, err)

por.refusedSpansCounter, err = meter.SyncInt64().Counter(
obsmetrics.ProcessorPrefix+obsmetrics.RefusedSpansKey,
instrument.WithDescription("Number of spans that were rejected by the next component in the pipeline."),
instrument.WithUnit(unit.Dimensionless),
)
errors = multierr.Append(errors, err)

por.droppedSpansCounter, err = meter.SyncInt64().Counter(
obsmetrics.ProcessorPrefix+obsmetrics.DroppedSpansKey,
instrument.WithDescription("Number of spans that were dropped."),
instrument.WithUnit(unit.Dimensionless),
)
errors = multierr.Append(errors, err)

por.acceptedMetricPointsCounter, err = meter.SyncInt64().Counter(
obsmetrics.ProcessorPrefix+obsmetrics.AcceptedMetricPointsKey,
instrument.WithDescription("Number of metric points successfully pushed into the next component in the pipeline."),
instrument.WithUnit(unit.Dimensionless),
)
errors = multierr.Append(errors, err)

por.refusedMetricPointsCounter, err = meter.SyncInt64().Counter(
obsmetrics.ProcessorPrefix+obsmetrics.RefusedMetricPointsKey,
instrument.WithDescription("Number of metric points that were rejected by the next component in the pipeline."),
instrument.WithUnit(unit.Dimensionless),
)
errors = multierr.Append(errors, err)

por.droppedMetricPointsCounter, err = meter.SyncInt64().Counter(
obsmetrics.ProcessorPrefix+obsmetrics.DroppedMetricPointsKey,
instrument.WithDescription("Number of metric points that were dropped."),
instrument.WithUnit(unit.Dimensionless),
)
errors = multierr.Append(errors, err)

por.acceptedLogRecordsCounter, err = meter.SyncInt64().Counter(
obsmetrics.ProcessorPrefix+obsmetrics.AcceptedLogRecordsKey,
instrument.WithDescription("Number of log records successfully pushed into the next component in the pipeline."),
instrument.WithUnit(unit.Dimensionless),
)
errors = multierr.Append(errors, err)

por.refusedLogRecordsCounter, err = meter.SyncInt64().Counter(
obsmetrics.ProcessorPrefix+obsmetrics.RefusedLogRecordsKey,
instrument.WithDescription("Number of log records that were rejected by the next component in the pipeline."),
instrument.WithUnit(unit.Dimensionless),
)
errors = multierr.Append(errors, err)

por.droppedLogRecordsCounter, err = meter.SyncInt64().Counter(
obsmetrics.ProcessorPrefix+obsmetrics.DroppedLogRecordsKey,
instrument.WithDescription("Number of log records that were dropped."),
instrument.WithUnit(unit.Dimensionless),
)
errors = multierr.Append(errors, err)

return errors
}

func (por *Processor) recordWithOtel(ctx context.Context, dataType component.DataType, accepted, refused, dropped int64) {
var acceptedCount, refusedCount, droppedCount syncint64.Counter
switch dataType {
case component.DataTypeTraces:
acceptedCount = por.acceptedSpansCounter
refusedCount = por.refusedSpansCounter
droppedCount = por.droppedSpansCounter
case component.DataTypeMetrics:
acceptedCount = por.acceptedMetricPointsCounter
refusedCount = por.refusedMetricPointsCounter
droppedCount = por.droppedMetricPointsCounter
case component.DataTypeLogs:
acceptedCount = por.acceptedLogRecordsCounter
refusedCount = por.refusedLogRecordsCounter
droppedCount = por.droppedLogRecordsCounter
}

acceptedCount.Add(ctx, accepted, por.otelAttrs...)
refusedCount.Add(ctx, refused, por.otelAttrs...)
droppedCount.Add(ctx, dropped, por.otelAttrs...)
}

func (por *Processor) recordWithOC(ctx context.Context, dataType component.DataType, accepted, refused, dropped int64) {
var acceptedMeasure, refusedMeasure, droppedMeasure *stats.Int64Measure

switch dataType {
case component.DataTypeTraces:
acceptedMeasure = obsmetrics.ProcessorAcceptedSpans
refusedMeasure = obsmetrics.ProcessorRefusedSpans
droppedMeasure = obsmetrics.ProcessorDroppedSpans
case component.DataTypeMetrics:
acceptedMeasure = obsmetrics.ProcessorAcceptedMetricPoints
refusedMeasure = obsmetrics.ProcessorRefusedMetricPoints
droppedMeasure = obsmetrics.ProcessorDroppedMetricPoints
case component.DataTypeLogs:
acceptedMeasure = obsmetrics.ProcessorAcceptedLogRecords
refusedMeasure = obsmetrics.ProcessorRefusedLogRecords
droppedMeasure = obsmetrics.ProcessorDroppedLogRecords
}

// ignore the error for now; should not happen
_ = stats.RecordWithTags(
ctx,
por.mutators,
acceptedMeasure.M(accepted),
refusedMeasure.M(refused),
droppedMeasure.M(dropped),
)
}

func (por *Processor) recordData(ctx context.Context, dataType component.DataType, accepted, refused, dropped int64) {
if por.useOtelForMetrics {
por.recordWithOtel(ctx, dataType, accepted, refused, dropped)
} else {
por.recordWithOC(ctx, dataType, accepted, refused, dropped)
}
}

// TracesAccepted reports that the trace data was accepted.
func (por *Processor) TracesAccepted(ctx context.Context, numSpans int) {
if por.level != configtelemetry.LevelNone {
// ignore the error for now; should not happen
_ = stats.RecordWithTags(
ctx,
por.mutators,
obsmetrics.ProcessorAcceptedSpans.M(int64(numSpans)),
obsmetrics.ProcessorRefusedSpans.M(0),
obsmetrics.ProcessorDroppedSpans.M(0),
)
por.recordData(ctx, component.DataTypeTraces, int64(numSpans), int64(0), int64(0))
}
}

// TracesRefused reports that the trace data was refused.
func (por *Processor) TracesRefused(ctx context.Context, numSpans int) {
if por.level != configtelemetry.LevelNone {
// ignore the error for now; should not happen
_ = stats.RecordWithTags(
ctx,
por.mutators,
obsmetrics.ProcessorAcceptedSpans.M(0),
obsmetrics.ProcessorRefusedSpans.M(int64(numSpans)),
obsmetrics.ProcessorDroppedSpans.M(0),
)
por.recordData(ctx, component.DataTypeTraces, int64(0), int64(numSpans), int64(0))
}
}

// TracesDropped reports that the trace data was dropped.
func (por *Processor) TracesDropped(ctx context.Context, numSpans int) {
if por.level != configtelemetry.LevelNone {
// ignore the error for now; should not happen
_ = stats.RecordWithTags(
ctx,
por.mutators,
obsmetrics.ProcessorAcceptedSpans.M(0),
obsmetrics.ProcessorRefusedSpans.M(0),
obsmetrics.ProcessorDroppedSpans.M(int64(numSpans)),
)
por.recordData(ctx, component.DataTypeTraces, int64(0), int64(0), int64(numSpans))
}
}

// MetricsAccepted reports that the metrics were accepted.
func (por *Processor) MetricsAccepted(ctx context.Context, numPoints int) {
if por.level != configtelemetry.LevelNone {
// ignore the error for now; should not happen
_ = stats.RecordWithTags(
ctx,
por.mutators,
obsmetrics.ProcessorAcceptedMetricPoints.M(int64(numPoints)),
obsmetrics.ProcessorRefusedMetricPoints.M(0),
obsmetrics.ProcessorDroppedMetricPoints.M(0),
)
por.recordData(ctx, component.DataTypeMetrics, int64(numPoints), int64(0), int64(0))
}
}

// MetricsRefused reports that the metrics were refused.
func (por *Processor) MetricsRefused(ctx context.Context, numPoints int) {
if por.level != configtelemetry.LevelNone {
// ignore the error for now; should not happen
_ = stats.RecordWithTags(
ctx,
por.mutators,
obsmetrics.ProcessorAcceptedMetricPoints.M(0),
obsmetrics.ProcessorRefusedMetricPoints.M(int64(numPoints)),
obsmetrics.ProcessorDroppedMetricPoints.M(0),
)
por.recordData(ctx, component.DataTypeMetrics, int64(0), int64(numPoints), int64(0))
}
}

// MetricsDropped reports that the metrics were dropped.
func (por *Processor) MetricsDropped(ctx context.Context, numPoints int) {
if por.level != configtelemetry.LevelNone {
// ignore the error for now; should not happen
_ = stats.RecordWithTags(
ctx,
por.mutators,
obsmetrics.ProcessorAcceptedMetricPoints.M(0),
obsmetrics.ProcessorRefusedMetricPoints.M(0),
obsmetrics.ProcessorDroppedMetricPoints.M(int64(numPoints)),
)
por.recordData(ctx, component.DataTypeMetrics, int64(0), int64(0), int64(numPoints))
}
}

// LogsAccepted reports that the logs were accepted.
func (por *Processor) LogsAccepted(ctx context.Context, numRecords int) {
if por.level != configtelemetry.LevelNone {
// ignore the error for now; should not happen
_ = stats.RecordWithTags(
ctx,
por.mutators,
obsmetrics.ProcessorAcceptedLogRecords.M(int64(numRecords)),
obsmetrics.ProcessorRefusedLogRecords.M(0),
obsmetrics.ProcessorDroppedLogRecords.M(0),
)
por.recordData(ctx, component.DataTypeLogs, int64(numRecords), int64(0), int64(0))
}
}

// LogsRefused reports that the logs were refused.
func (por *Processor) LogsRefused(ctx context.Context, numRecords int) {
if por.level != configtelemetry.LevelNone {
// ignore the error for now; should not happen
_ = stats.RecordWithTags(
ctx,
por.mutators,
obsmetrics.ProcessorAcceptedLogRecords.M(0),
obsmetrics.ProcessorRefusedLogRecords.M(int64(numRecords)),
obsmetrics.ProcessorDroppedMetricPoints.M(0),
)
por.recordData(ctx, component.DataTypeLogs, int64(0), int64(numRecords), int64(0))
}
}

// LogsDropped reports that the logs were dropped.
func (por *Processor) LogsDropped(ctx context.Context, numRecords int) {
if por.level != configtelemetry.LevelNone {
// ignore the error for now; should not happen
_ = stats.RecordWithTags(
ctx,
por.mutators,
obsmetrics.ProcessorAcceptedLogRecords.M(0),
obsmetrics.ProcessorRefusedLogRecords.M(0),
obsmetrics.ProcessorDroppedLogRecords.M(int64(numRecords)),
)
por.recordData(ctx, component.DataTypeLogs, int64(0), int64(0), int64(numRecords))
}
}
Loading

0 comments on commit 4ff1ff3

Please sign in to comment.