Skip to content

Commit

Permalink
[exporterhelper] make enqueue failures available
Browse files Browse the repository at this point in the history
These metrics were only exporter either via OC or via the prometheus
exporter. Fixes #8673

Signed-off-by: Alex Boten <aboten@lightstep.com>
  • Loading branch information
Alex Boten committed Oct 12, 2023
1 parent 35cee30 commit 94e2f4c
Show file tree
Hide file tree
Showing 17 changed files with 156 additions and 141 deletions.
10 changes: 5 additions & 5 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (b *baseRequestSender) setNextSender(nextSender requestSender) {
b.nextSender = nextSender
}

type obsrepSenderFactory func(obsrep *obsExporter) requestSender
type obsrepSenderFactory func(obsrep *ObsReport) requestSender

// baseRequest is a base implementation for the internal.Request.
type baseRequest struct {
Expand Down Expand Up @@ -143,7 +143,7 @@ type baseExporter struct {
signal component.DataType

set exporter.CreateSettings
obsrep *obsExporter
obsrep *ObsReport

// Chain of senders that the exporter helper applies before passing the data to the actual exporter.
// The data is handled by each sender in the respective order starting from the queueSender.
Expand All @@ -163,7 +163,7 @@ type baseExporter struct {
func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool, marshaler internal.RequestMarshaler,
unmarshaler internal.RequestUnmarshaler, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) {

obsrep, err := newObsExporter(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set}, globalInstruments)
obsReport, err := NewObsReport(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set})
if err != nil {
return nil, err
}
Expand All @@ -175,12 +175,12 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req
signal: signal,

queueSender: &baseRequestSender{},
obsrepSender: osf(obsrep),
obsrepSender: osf(obsReport),
retrySender: &baseRequestSender{},
timeoutSender: &timeoutSender{cfg: NewDefaultTimeoutSettings()},

set: set,
obsrep: obsrep,
obsrep: obsReport,
}

for _, op := range options {
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var (
}
)

func newNoopObsrepSender(_ *obsExporter) requestSender {
func newNoopObsrepSender(_ *ObsReport) requestSender {
return &baseRequestSender{}
}

Expand Down
8 changes: 4 additions & 4 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func NewLogsExporter(
req := newLogsRequest(ctx, ld, pusher)
serr := be.send(req)
if errors.Is(serr, errSendingQueueIsFull) {
be.obsrep.recordLogsEnqueueFailure(req.Context(), int64(req.Count()))
be.obsrep.recordEnqueueFailure(req.Context(), component.DataTypeLogs, int64(req.Count()))
}
return serr
}, be.consumerOptions...)
Expand Down Expand Up @@ -151,7 +151,7 @@ func NewLogsRequestExporter(
r := newRequest(ctx, req)
sErr := be.send(r)
if errors.Is(sErr, errSendingQueueIsFull) {
be.obsrep.recordLogsEnqueueFailure(r.Context(), int64(r.Count()))
be.obsrep.recordEnqueueFailure(r.Context(), component.DataTypeLogs, int64(r.Count()))
}
return sErr
}, be.consumerOptions...)
Expand All @@ -164,10 +164,10 @@ func NewLogsRequestExporter(

type logsExporterWithObservability struct {
baseRequestSender
obsrep *obsExporter
obsrep *ObsReport
}

func newLogsExporterWithObservability(obsrep *obsExporter) requestSender {
func newLogsExporterWithObservability(obsrep *ObsReport) requestSender {
return &logsExporterWithObservability{obsrep: obsrep}
}

Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func TestLogsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
}

// 2 batched must be in queue, and 5 batches (15 log records) rejected due to queue overflow
checkExporterEnqueueFailedLogsStats(t, globalInstruments, fakeLogsExporterName, int64(15))
require.NoError(t, tt.CheckExporterEnqueueFailedLogs(int64(15)))
}

func TestLogsExporter_WithSpan(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions exporter/exporterhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func NewMetricsExporter(
req := newMetricsRequest(ctx, md, pusher)
serr := be.send(req)
if errors.Is(serr, errSendingQueueIsFull) {
be.obsrep.recordMetricsEnqueueFailure(req.Context(), int64(req.Count()))
be.obsrep.recordEnqueueFailure(req.Context(), component.DataTypeMetrics, int64(req.Count()))
}
return serr
}, be.consumerOptions...)
Expand Down Expand Up @@ -151,7 +151,7 @@ func NewMetricsRequestExporter(
r := newRequest(ctx, req)
sErr := be.send(r)
if errors.Is(sErr, errSendingQueueIsFull) {
be.obsrep.recordMetricsEnqueueFailure(r.Context(), int64(r.Count()))
be.obsrep.recordEnqueueFailure(r.Context(), component.DataTypeMetrics, int64(r.Count()))
}
return sErr
}, be.consumerOptions...)
Expand All @@ -164,10 +164,10 @@ func NewMetricsRequestExporter(

type metricsSenderWithObservability struct {
baseRequestSender
obsrep *obsExporter
obsrep *ObsReport
}

func newMetricsSenderWithObservability(obsrep *obsExporter) requestSender {
func newMetricsSenderWithObservability(obsrep *ObsReport) requestSender {
return &metricsSenderWithObservability{obsrep: obsrep}
}

Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func TestMetricsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
}

// 2 batched must be in queue, and 10 metric points rejected due to queue overflow
checkExporterEnqueueFailedMetricsStats(t, globalInstruments, fakeMetricsExporterName, int64(10))
require.NoError(t, tt.CheckExporterEnqueueFailedMetrics(int64(10)))
}

func TestMetricsExporter_WithSpan(t *testing.T) {
Expand Down
77 changes: 69 additions & 8 deletions exporter/exporterhelper/obsexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,17 @@ type ObsReport struct {
tracer trace.Tracer
logger *zap.Logger

useOtelForMetrics bool
otelAttrs []attribute.KeyValue
sentSpans metric.Int64Counter
failedToSendSpans metric.Int64Counter
sentMetricPoints metric.Int64Counter
failedToSendMetricPoints metric.Int64Counter
sentLogRecords metric.Int64Counter
failedToSendLogRecords metric.Int64Counter
useOtelForMetrics bool
otelAttrs []attribute.KeyValue
sentSpans metric.Int64Counter
failedToSendSpans metric.Int64Counter
failedToEnqueueSpans metric.Int64Counter
sentMetricPoints metric.Int64Counter
failedToSendMetricPoints metric.Int64Counter
failedToEnqueueMetricPoints metric.Int64Counter
sentLogRecords metric.Int64Counter
failedToSendLogRecords metric.Int64Counter
failedToEnqueueLogRecords metric.Int64Counter
}

// ObsReportSettings are settings for creating an ObsReport.
Expand Down Expand Up @@ -96,6 +99,12 @@ func (or *ObsReport) createOtelMetrics(cfg ObsReportSettings) error {
metric.WithUnit("1"))
errors = multierr.Append(errors, err)

or.failedToEnqueueSpans, err = meter.Int64Counter(
obsmetrics.ExporterPrefix+obsmetrics.FailedToEnqueueSpansKey,
metric.WithDescription("Number of spans failed to be added to the sending queue."),
metric.WithUnit("1"))
errors = multierr.Append(errors, err)

or.sentMetricPoints, err = meter.Int64Counter(
obsmetrics.ExporterPrefix+obsmetrics.SentMetricPointsKey,
metric.WithDescription("Number of metric points successfully sent to destination."),
Expand All @@ -108,6 +117,12 @@ func (or *ObsReport) createOtelMetrics(cfg ObsReportSettings) error {
metric.WithUnit("1"))
errors = multierr.Append(errors, err)

or.failedToEnqueueMetricPoints, err = meter.Int64Counter(
obsmetrics.ExporterPrefix+obsmetrics.FailedToEnqueueMetricPointsKey,
metric.WithDescription("Number of metric points failed to be added to the sending queue."),
metric.WithUnit("1"))
errors = multierr.Append(errors, err)

or.sentLogRecords, err = meter.Int64Counter(
obsmetrics.ExporterPrefix+obsmetrics.SentLogRecordsKey,
metric.WithDescription("Number of log record successfully sent to destination."),
Expand All @@ -120,6 +135,12 @@ func (or *ObsReport) createOtelMetrics(cfg ObsReportSettings) error {
metric.WithUnit("1"))
errors = multierr.Append(errors, err)

or.failedToEnqueueLogRecords, err = meter.Int64Counter(
obsmetrics.ExporterPrefix+obsmetrics.FailedToEnqueueLogRecordsKey,
metric.WithDescription("Number of log records failed to be added to the sending queue."),
metric.WithUnit("1"))
errors = multierr.Append(errors, err)

return errors
}

Expand Down Expand Up @@ -252,3 +273,43 @@ func toNumItems(numExportedItems int, err error) (int64, int64) {
}
return int64(numExportedItems), 0
}

func (or *ObsReport) recordEnqueueFailure(ctx context.Context, dataType component.DataType, failed int64) {
if or.useOtelForMetrics {
or.recordEnqueueFailureWithOtel(ctx, dataType, failed)
} else {
or.recordEnqueueFailureWithOC(ctx, dataType, failed)
}
}

func (or *ObsReport) recordEnqueueFailureWithOC(ctx context.Context, dataType component.DataType, failed int64) {
var failedMeasure *stats.Int64Measure
switch dataType {
case component.DataTypeTraces:
failedMeasure = obsmetrics.ExporterFailedToEnqueueSpans
case component.DataTypeMetrics:
failedMeasure = obsmetrics.ExporterFailedToEnqueueMetricPoints
case component.DataTypeLogs:
failedMeasure = obsmetrics.ExporterFailedToEnqueueLogRecords
}
if failed > 0 {
_ = stats.RecordWithTags(
ctx,
or.mutators,
failedMeasure.M(failed))
}
}

func (or *ObsReport) recordEnqueueFailureWithOtel(ctx context.Context, dataType component.DataType, failed int64) {
var enqueueFailedMeasure metric.Int64Counter
switch dataType {
case component.DataTypeTraces:
enqueueFailedMeasure = or.failedToEnqueueSpans
case component.DataTypeMetrics:
enqueueFailedMeasure = or.failedToEnqueueMetricPoints
case component.DataTypeLogs:
enqueueFailedMeasure = or.failedToEnqueueLogRecords
}

enqueueFailedMeasure.Add(ctx, failed, metric.WithAttributes(or.otelAttrs...))
}
73 changes: 3 additions & 70 deletions exporter/exporterhelper/obsreport.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper"

import (
"context"

"go.opencensus.io/metric"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/metric/metricproducer"
Expand All @@ -26,12 +24,9 @@ func init() {
}

type instruments struct {
registry *metric.Registry
queueSize *metric.Int64DerivedGauge
queueCapacity *metric.Int64DerivedGauge
failedToEnqueueTraceSpans *metric.Int64Cumulative
failedToEnqueueMetricPoints *metric.Int64Cumulative
failedToEnqueueLogRecords *metric.Int64Cumulative
registry *metric.Registry
queueSize *metric.Int64DerivedGauge
queueCapacity *metric.Int64DerivedGauge
}

func newInstruments(registry *metric.Registry) *instruments {
Expand All @@ -49,67 +44,5 @@ func newInstruments(registry *metric.Registry) *instruments {
metric.WithDescription("Fixed capacity of the retry queue (in batches)"),
metric.WithLabelKeys(obsmetrics.ExporterKey),
metric.WithUnit(metricdata.UnitDimensionless))

insts.failedToEnqueueTraceSpans, _ = registry.AddInt64Cumulative(
obsmetrics.ExporterKey+"/enqueue_failed_spans",
metric.WithDescription("Number of spans failed to be added to the sending queue."),
metric.WithLabelKeys(obsmetrics.ExporterKey),
metric.WithUnit(metricdata.UnitDimensionless))

insts.failedToEnqueueMetricPoints, _ = registry.AddInt64Cumulative(
obsmetrics.ExporterKey+"/enqueue_failed_metric_points",
metric.WithDescription("Number of metric points failed to be added to the sending queue."),
metric.WithLabelKeys(obsmetrics.ExporterKey),
metric.WithUnit(metricdata.UnitDimensionless))

insts.failedToEnqueueLogRecords, _ = registry.AddInt64Cumulative(
obsmetrics.ExporterKey+"/enqueue_failed_log_records",
metric.WithDescription("Number of log records failed to be added to the sending queue."),
metric.WithLabelKeys(obsmetrics.ExporterKey),
metric.WithUnit(metricdata.UnitDimensionless))

return insts
}

// obsExporter is a helper to add observability to an exporter.
type obsExporter struct {
*ObsReport
failedToEnqueueTraceSpansEntry *metric.Int64CumulativeEntry
failedToEnqueueMetricPointsEntry *metric.Int64CumulativeEntry
failedToEnqueueLogRecordsEntry *metric.Int64CumulativeEntry
}

// newObsExporter creates a new observability exporter.
func newObsExporter(cfg ObsReportSettings, insts *instruments) (*obsExporter, error) {
labelValue := metricdata.NewLabelValue(cfg.ExporterID.String())
failedToEnqueueTraceSpansEntry, _ := insts.failedToEnqueueTraceSpans.GetEntry(labelValue)
failedToEnqueueMetricPointsEntry, _ := insts.failedToEnqueueMetricPoints.GetEntry(labelValue)
failedToEnqueueLogRecordsEntry, _ := insts.failedToEnqueueLogRecords.GetEntry(labelValue)

exp, err := NewObsReport(cfg)
if err != nil {
return nil, err
}

return &obsExporter{
ObsReport: exp,
failedToEnqueueTraceSpansEntry: failedToEnqueueTraceSpansEntry,
failedToEnqueueMetricPointsEntry: failedToEnqueueMetricPointsEntry,
failedToEnqueueLogRecordsEntry: failedToEnqueueLogRecordsEntry,
}, nil
}

// recordTracesEnqueueFailure records number of spans that failed to be added to the sending queue.
func (eor *obsExporter) recordTracesEnqueueFailure(_ context.Context, numSpans int64) {
eor.failedToEnqueueTraceSpansEntry.Inc(numSpans)
}

// recordMetricsEnqueueFailure records number of metric points that failed to be added to the sending queue.
func (eor *obsExporter) recordMetricsEnqueueFailure(_ context.Context, numMetricPoints int64) {
eor.failedToEnqueueMetricPointsEntry.Inc(numMetricPoints)
}

// recordLogsEnqueueFailure records number of log records that failed to be added to the sending queue.
func (eor *obsExporter) recordLogsEnqueueFailure(_ context.Context, numLogRecords int64) {
eor.failedToEnqueueLogRecordsEntry.Inc(numLogRecords)
}
44 changes: 9 additions & 35 deletions exporter/exporterhelper/obsreport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"testing"

"github.com/stretchr/testify/require"
"go.opencensus.io/metric"
"go.opencensus.io/tag"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
Expand All @@ -22,47 +20,23 @@ func TestExportEnqueueFailure(t *testing.T) {
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

insts := newInstruments(metric.NewRegistry())
obsrep, err := newObsExporter(ObsReportSettings{
obsrep, err := NewObsReport(ObsReportSettings{
ExporterID: exporterID,
ExporterCreateSettings: exporter.CreateSettings{ID: exporterID, TelemetrySettings: tt.TelemetrySettings, BuildInfo: component.NewDefaultBuildInfo()},
}, insts)
})
require.NoError(t, err)

logRecords := int64(7)
obsrep.recordLogsEnqueueFailure(context.Background(), logRecords)
checkExporterEnqueueFailedLogsStats(t, insts, exporterID, logRecords)
obsrep.recordEnqueueFailureWithOC(context.Background(), component.DataTypeLogs, logRecords)
require.NoError(t, tt.CheckExporterEnqueueFailedLogs(logRecords))

spans := int64(12)
obsrep.recordTracesEnqueueFailure(context.Background(), spans)
checkExporterEnqueueFailedTracesStats(t, insts, exporterID, spans)
obsrep.recordEnqueueFailureWithOC(context.Background(), component.DataTypeTraces, spans)
require.NoError(t, tt.CheckExporterEnqueueFailedTraces(spans))

metricPoints := int64(21)
obsrep.recordMetricsEnqueueFailure(context.Background(), metricPoints)
checkExporterEnqueueFailedMetricsStats(t, insts, exporterID, metricPoints)
obsrep.recordEnqueueFailureWithOC(context.Background(), component.DataTypeMetrics, metricPoints)
require.NoError(t, tt.CheckExporterEnqueueFailedMetrics(metricPoints))
}

// checkExporterEnqueueFailedTracesStats checks that reported number of spans failed to enqueue match given values.
// When this function is called it is required to also call SetupTelemetry as first thing.
func checkExporterEnqueueFailedTracesStats(t *testing.T, insts *instruments, exporter component.ID, spans int64) {
checkValueForProducer(t, insts.registry, tagsForExporterView(exporter), spans, "exporter/enqueue_failed_spans")
}

// checkExporterEnqueueFailedMetricsStats checks that reported number of metric points failed to enqueue match given values.
// When this function is called it is required to also call SetupTelemetry as first thing.
func checkExporterEnqueueFailedMetricsStats(t *testing.T, insts *instruments, exporter component.ID, metricPoints int64) {
checkValueForProducer(t, insts.registry, tagsForExporterView(exporter), metricPoints, "exporter/enqueue_failed_metric_points")
}

// checkExporterEnqueueFailedLogsStats checks that reported number of log records failed to enqueue match given values.
// When this function is called it is required to also call SetupTelemetry as first thing.
func checkExporterEnqueueFailedLogsStats(t *testing.T, insts *instruments, exporter component.ID, logRecords int64) {
checkValueForProducer(t, insts.registry, tagsForExporterView(exporter), logRecords, "exporter/enqueue_failed_log_records")
}

// tagsForExporterView returns the tags that are needed for the exporter views.
func tagsForExporterView(exporter component.ID) []tag.Tag {
return []tag.Tag{
{Key: exporterTag, Value: exporter.String()},
}
}
// TODO: add test for validating recording enqueue failures for OTel
Loading

0 comments on commit 94e2f4c

Please sign in to comment.