Skip to content

Commit

Permalink
Instrument exporters' obsreport with OpenTelemetry (#6346)
Browse files Browse the repository at this point in the history
  • Loading branch information
bogdandrutu authored Nov 1, 2022
2 parents 9a56290 + 487345c commit 0a55d0b
Show file tree
Hide file tree
Showing 6 changed files with 331 additions and 160 deletions.
16 changes: 16 additions & 0 deletions .chloggen/obsreport-exporter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: obsreport

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Instrument `obsreport.Exporter` metrics with otel-go"

# One or more tracking issues or pull requests related to the change
issues: [6346]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
141 changes: 132 additions & 9 deletions obsreport/obsreport_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,42 @@ import (
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
"go.opentelemetry.io/otel/metric/unit"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/internal/obsreportconfig"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
)

const (
exporterName = "exporter"

exporterScope = scopeName + nameSep + exporterName
)

// Exporter is a helper to add observability to a component.Exporter.
type Exporter struct {
level configtelemetry.Level
spanNamePrefix string
mutators []tag.Mutator
tracer trace.Tracer
logger *zap.Logger

useOtelForMetrics bool
otelAttrs []attribute.KeyValue
sentSpans syncint64.Counter
failedToSendSpans syncint64.Counter
sentMetricPoints syncint64.Counter
failedToSendMetricPoints syncint64.Counter
sentLogRecords syncint64.Counter
failedToSendLogRecords syncint64.Counter
}

// ExporterSettings are settings for creating an Exporter.
Expand All @@ -44,12 +66,75 @@ type ExporterSettings struct {

// NewExporter creates a new Exporter.
func NewExporter(cfg ExporterSettings) *Exporter {
return &Exporter{
return newExporter(cfg, featuregate.GetRegistry())
}

func newExporter(cfg ExporterSettings, registry *featuregate.Registry) *Exporter {
exp := &Exporter{
level: cfg.ExporterCreateSettings.TelemetrySettings.MetricsLevel,
spanNamePrefix: obsmetrics.ExporterPrefix + cfg.ExporterID.String(),
mutators: []tag.Mutator{tag.Upsert(obsmetrics.TagKeyExporter, cfg.ExporterID.String(), tag.WithTTL(tag.TTLNoPropagation))},
tracer: cfg.ExporterCreateSettings.TracerProvider.Tracer(cfg.ExporterID.String()),
logger: cfg.ExporterCreateSettings.Logger,

useOtelForMetrics: registry.IsEnabled(obsreportconfig.UseOtelForInternalMetricsfeatureGateID),
otelAttrs: []attribute.KeyValue{
attribute.String(obsmetrics.ExporterKey, cfg.ExporterID.String()),
},
}

exp.createOtelMetrics(cfg)

return exp
}

func (exp *Exporter) createOtelMetrics(cfg ExporterSettings) {
if !exp.useOtelForMetrics {
return
}
meter := cfg.ExporterCreateSettings.MeterProvider.Meter(exporterScope)

var err error
handleError := func(metricName string, err error) {
if err != nil {
exp.logger.Warn("failed to create otel instrument", zap.Error(err), zap.String("metric", metricName))
}
}
exp.sentSpans, err = meter.SyncInt64().Counter(
obsmetrics.ExporterPrefix+obsmetrics.SentSpansKey,
instrument.WithDescription("Number of spans successfully sent to destination."),
instrument.WithUnit(unit.Dimensionless))
handleError(obsmetrics.ExporterPrefix+obsmetrics.SentSpansKey, err)

exp.failedToSendSpans, err = meter.SyncInt64().Counter(
obsmetrics.ExporterPrefix+obsmetrics.FailedToSendSpansKey,
instrument.WithDescription("Number of spans in failed attempts to send to destination."),
instrument.WithUnit(unit.Dimensionless))
handleError(obsmetrics.ExporterPrefix+obsmetrics.FailedToSendSpansKey, err)

exp.sentMetricPoints, err = meter.SyncInt64().Counter(
obsmetrics.ExporterPrefix+obsmetrics.SentMetricPointsKey,
instrument.WithDescription("Number of metric points successfully sent to destination."),
instrument.WithUnit(unit.Dimensionless))
handleError(obsmetrics.ExporterPrefix+obsmetrics.SentMetricPointsKey, err)

exp.failedToSendMetricPoints, err = meter.SyncInt64().Counter(
obsmetrics.ExporterPrefix+obsmetrics.FailedToSendMetricPointsKey,
instrument.WithDescription("Number of metric points in failed attempts to send to destination."),
instrument.WithUnit(unit.Dimensionless))
handleError(obsmetrics.ExporterPrefix+obsmetrics.FailedToSendMetricPointsKey, err)

exp.sentLogRecords, err = meter.SyncInt64().Counter(
obsmetrics.ExporterPrefix+obsmetrics.SentLogRecordsKey,
instrument.WithDescription("Number of log record successfully sent to destination."),
instrument.WithUnit(unit.Dimensionless))
handleError(obsmetrics.ExporterPrefix+obsmetrics.SentLogRecordsKey, err)

exp.failedToSendLogRecords, err = meter.SyncInt64().Counter(
obsmetrics.ExporterPrefix+obsmetrics.FailedToSendLogRecordsKey,
instrument.WithDescription("Number of log records in failed attempts to send to destination."),
instrument.WithUnit(unit.Dimensionless))
handleError(obsmetrics.ExporterPrefix+obsmetrics.FailedToSendLogRecordsKey, err)
}

// StartTracesOp is called at the start of an Export operation.
Expand All @@ -62,7 +147,7 @@ func (exp *Exporter) StartTracesOp(ctx context.Context) context.Context {
// EndTracesOp completes the export operation that was started with StartTracesOp.
func (exp *Exporter) EndTracesOp(ctx context.Context, numSpans int, err error) {
numSent, numFailedToSend := toNumItems(numSpans, err)
exp.recordMetrics(ctx, numSent, numFailedToSend, obsmetrics.ExporterSentSpans, obsmetrics.ExporterFailedToSendSpans)
exp.recordMetrics(ctx, config.TracesDataType, numSent, numFailedToSend)
endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentSpansKey, obsmetrics.FailedToSendSpansKey)
}

Expand All @@ -77,7 +162,7 @@ func (exp *Exporter) StartMetricsOp(ctx context.Context) context.Context {
// StartMetricsOp.
func (exp *Exporter) EndMetricsOp(ctx context.Context, numMetricPoints int, err error) {
numSent, numFailedToSend := toNumItems(numMetricPoints, err)
exp.recordMetrics(ctx, numSent, numFailedToSend, obsmetrics.ExporterSentMetricPoints, obsmetrics.ExporterFailedToSendMetricPoints)
exp.recordMetrics(ctx, config.MetricsDataType, numSent, numFailedToSend)
endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentMetricPointsKey, obsmetrics.FailedToSendMetricPointsKey)
}

Expand All @@ -91,7 +176,7 @@ func (exp *Exporter) StartLogsOp(ctx context.Context) context.Context {
// EndLogsOp completes the export operation that was started with StartLogsOp.
func (exp *Exporter) EndLogsOp(ctx context.Context, numLogRecords int, err error) {
numSent, numFailedToSend := toNumItems(numLogRecords, err)
exp.recordMetrics(ctx, numSent, numFailedToSend, obsmetrics.ExporterSentLogRecords, obsmetrics.ExporterFailedToSendLogRecords)
exp.recordMetrics(ctx, config.LogsDataType, numSent, numFailedToSend)
endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentLogRecordsKey, obsmetrics.FailedToSendLogRecordsKey)
}

Expand All @@ -103,16 +188,54 @@ func (exp *Exporter) startOp(ctx context.Context, operationSuffix string) contex
return ctx
}

func (exp *Exporter) recordMetrics(ctx context.Context, numSent, numFailedToSend int64, sentMeasure, failedToSendMeasure *stats.Int64Measure) {
func (exp *Exporter) recordMetrics(ctx context.Context, dataType config.DataType, numSent, numFailed int64) {
if exp.level == configtelemetry.LevelNone {
return
}
// Ignore the error for now. This should not happen.
if numFailedToSend > 0 {
_ = stats.RecordWithTags(ctx, exp.mutators, sentMeasure.M(numSent), failedToSendMeasure.M(numFailedToSend))
if exp.useOtelForMetrics {
exp.recordWithOtel(ctx, dataType, numSent, numFailed)
} else {
_ = stats.RecordWithTags(ctx, exp.mutators, sentMeasure.M(numSent))
exp.recordWithOC(ctx, dataType, numSent, numFailed)
}
}

func (exp *Exporter) recordWithOtel(ctx context.Context, dataType config.DataType, sent int64, failed int64) {
var sentMeasure, failedMeasure syncint64.Counter
switch dataType {
case config.TracesDataType:
sentMeasure = exp.sentSpans
failedMeasure = exp.failedToSendSpans
case config.MetricsDataType:
sentMeasure = exp.sentMetricPoints
failedMeasure = exp.failedToSendMetricPoints
case config.LogsDataType:
sentMeasure = exp.sentLogRecords
failedMeasure = exp.failedToSendLogRecords
}

sentMeasure.Add(ctx, sent, exp.otelAttrs...)
failedMeasure.Add(ctx, failed, exp.otelAttrs...)
}

func (exp *Exporter) recordWithOC(ctx context.Context, dataType config.DataType, sent int64, failed int64) {
var sentMeasure, failedMeasure *stats.Int64Measure
switch dataType {
case config.TracesDataType:
sentMeasure = obsmetrics.ExporterSentSpans
failedMeasure = obsmetrics.ExporterFailedToSendSpans
case config.MetricsDataType:
sentMeasure = obsmetrics.ExporterSentMetricPoints
failedMeasure = obsmetrics.ExporterFailedToSendMetricPoints
case config.LogsDataType:
sentMeasure = obsmetrics.ExporterSentLogRecords
failedMeasure = obsmetrics.ExporterFailedToSendLogRecords
}

_ = stats.RecordWithTags(
ctx,
exp.mutators,
sentMeasure.M(sent),
failedMeasure.M(failed))
}

func endSpan(ctx context.Context, err error, numSent, numFailedToSend int64, sentItemsKey, failedToSendItemsKey string) {
Expand Down
Loading

0 comments on commit 0a55d0b

Please sign in to comment.