From 52abb90fa1a240c405cb77bf0716bd716d1d03ea Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Fri, 14 May 2021 14:25:18 -0700 Subject: [PATCH] Use consumerhelper for exporterhelper, add WithCapabilities (#3186) Signed-off-by: Bogdan Drutu --- exporter/exporterhelper/common.go | 27 ++++++----- exporter/exporterhelper/common_test.go | 11 ++--- .../exporterhelper/{logshelper.go => logs.go} | 21 ++++----- .../{logshelper_test.go => logs_test.go} | 21 +++++++-- .../{metricshelper.go => metrics.go} | 27 +++++------ ...{metricshelper_test.go => metrics_test.go} | 45 +++++++++++++------ exporter/exporterhelper/queued_retry_test.go | 22 ++++----- .../{tracehelper.go => traces.go} | 21 ++++----- .../{tracehelper_test.go => traces_test.go} | 31 +++++++++---- 9 files changed, 135 insertions(+), 91 deletions(-) rename exporter/exporterhelper/{logshelper.go => logs.go} (88%) rename exporter/exporterhelper/{logshelper_test.go => logs_test.go} (90%) rename exporter/exporterhelper/{metricshelper.go => metrics.go} (86%) rename exporter/exporterhelper/{metricshelper_test.go => metrics_test.go} (83%) rename exporter/exporterhelper/{tracehelper.go => traces.go} (88%) rename exporter/exporterhelper/{tracehelper_test.go => traces_test.go} (87%) diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 82ae7e3a0458..27d533f5fd8c 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -23,6 +23,8 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenthelper" "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumerhelper" ) // TimeoutSettings for timeout. The timeout applies to individual attempts to send data to the backend. @@ -73,6 +75,7 @@ func (req *baseRequest) setContext(ctx context.Context) { // baseSettings represents all the options that users can configure. type baseSettings struct { componentOptions []componenthelper.Option + consumerOptions []consumerhelper.Option TimeoutSettings QueueSettings RetrySettings @@ -80,7 +83,7 @@ type baseSettings struct { } // fromOptions returns the internal options starting from the default and applying all configured options. -func fromOptions(options []Option) *baseSettings { +func fromOptions(options ...Option) *baseSettings { // Start from the default options: opts := &baseSettings{ TimeoutSettings: DefaultTimeoutSettings(), @@ -141,6 +144,15 @@ func WithQueue(queueSettings QueueSettings) Option { } } +// WithCapabilities overrides the default Capabilities() function for a Consumer. +// The default is non-mutable data. +// TODO: Verify if we can change the default to be mutable as we do for processors. +func WithCapabilities(capabilities consumer.Capabilities) Option { + return func(o *baseSettings) { + o.consumerOptions = append(o.consumerOptions, consumerhelper.WithCapabilities(capabilities)) + } +} + // WithResourceToTelemetryConversion overrides the default ResourceToTelemetrySettings for an exporter. // The default ResourceToTelemetrySettings is to disable resource attributes to metric labels conversion. func WithResourceToTelemetryConversion(resourceToTelemetrySettings ResourceToTelemetrySettings) Option { @@ -152,18 +164,13 @@ func WithResourceToTelemetryConversion(resourceToTelemetrySettings ResourceToTel // baseExporter contains common fields between different exporter types. type baseExporter struct { component.Component - cfg config.Exporter - sender requestSender - qrSender *queuedRetrySender - convertResourceToTelemetry bool + sender requestSender + qrSender *queuedRetrySender } -func newBaseExporter(cfg config.Exporter, logger *zap.Logger, options ...Option) *baseExporter { - bs := fromOptions(options) +func newBaseExporter(cfg config.Exporter, logger *zap.Logger, bs *baseSettings) *baseExporter { be := &baseExporter{ - Component: componenthelper.New(bs.componentOptions...), - cfg: cfg, - convertResourceToTelemetry: bs.ResourceToTelemetrySettings.Enabled, + Component: componenthelper.New(bs.componentOptions...), } be.qrSender = newQueuedRetrySender(cfg.ID().String(), bs.QueueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, logger) diff --git a/exporter/exporterhelper/common_test.go b/exporter/exporterhelper/common_test.go index 3df557eb315f..c200bca7049a 100644 --- a/exporter/exporterhelper/common_test.go +++ b/exporter/exporterhelper/common_test.go @@ -44,7 +44,7 @@ func TestErrorToStatus(t *testing.T) { } func TestBaseExporter(t *testing.T) { - be := newBaseExporter(&defaultExporterCfg, zap.NewNop()) + be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions()) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, be.Shutdown(context.Background())) } @@ -54,10 +54,11 @@ func TestBaseExporterWithOptions(t *testing.T) { be := newBaseExporter( &defaultExporterCfg, zap.NewNop(), - WithStart(func(ctx context.Context, host component.Host) error { return want }), - WithShutdown(func(ctx context.Context) error { return want }), - WithResourceToTelemetryConversion(defaultResourceToTelemetrySettings()), - WithTimeout(DefaultTimeoutSettings()), + fromOptions( + WithStart(func(ctx context.Context, host component.Host) error { return want }), + WithShutdown(func(ctx context.Context) error { return want }), + WithResourceToTelemetryConversion(defaultResourceToTelemetrySettings()), + WithTimeout(DefaultTimeoutSettings())), ) require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost())) require.Equal(t, want, be.Shutdown(context.Background())) diff --git a/exporter/exporterhelper/logshelper.go b/exporter/exporterhelper/logs.go similarity index 88% rename from exporter/exporterhelper/logshelper.go rename to exporter/exporterhelper/logs.go index a9480d9e404b..c9512108252b 100644 --- a/exporter/exporterhelper/logshelper.go +++ b/exporter/exporterhelper/logs.go @@ -61,15 +61,7 @@ func (req *logsRequest) count() int { type logsExporter struct { *baseExporter - pusher consumerhelper.ConsumeLogsFunc -} - -func (lexp *logsExporter) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: false} -} - -func (lexp *logsExporter) ConsumeLogs(ctx context.Context, ld pdata.Logs) error { - return lexp.sender.send(newLogsRequest(ctx, ld, lexp.pusher)) + consumer.Logs } // NewLogsExporter creates an LogsExporter that records observability metrics and wraps every request with a Span. @@ -91,7 +83,8 @@ func NewLogsExporter( return nil, errNilPushLogsData } - be := newBaseExporter(cfg, logger, options...) + bs := fromOptions(options...) + be := newBaseExporter(cfg, logger, bs) be.wrapConsumerSender(func(nextSender requestSender) requestSender { return &logsExporterWithObservability{ obsrep: obsreport.NewExporter(obsreport.ExporterSettings{ @@ -102,10 +95,14 @@ func NewLogsExporter( } }) + lc, err := consumerhelper.NewLogs(func(ctx context.Context, ld pdata.Logs) error { + return be.sender.send(newLogsRequest(ctx, ld, pusher)) + }, bs.consumerOptions...) + return &logsExporter{ baseExporter: be, - pusher: pusher, - }, nil + Logs: lc, + }, err } type logsExporterWithObservability struct { diff --git a/exporter/exporterhelper/logshelper_test.go b/exporter/exporterhelper/logs_test.go similarity index 90% rename from exporter/exporterhelper/logshelper_test.go rename to exporter/exporterhelper/logs_test.go index 7c21c9797eab..95f74bf684cc 100644 --- a/exporter/exporterhelper/logshelper_test.go +++ b/exporter/exporterhelper/logs_test.go @@ -24,7 +24,9 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumerhelper" "go.opentelemetry.io/collector/consumer/pdata" @@ -78,22 +80,33 @@ func TestLogsExporter_Default(t *testing.T) { assert.NotNil(t, le) assert.NoError(t, err) - assert.Nil(t, le.ConsumeLogs(context.Background(), ld)) - assert.Nil(t, le.Shutdown(context.Background())) + assert.Equal(t, consumer.Capabilities{MutatesData: false}, le.Capabilities()) + assert.NoError(t, le.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, le.ConsumeLogs(context.Background(), ld)) + assert.NoError(t, le.Shutdown(context.Background())) +} + +func TestLogsExporter_WithCapabilities(t *testing.T) { + capabilities := consumer.Capabilities{MutatesData: true} + le, err := NewLogsExporter(&fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(nil), WithCapabilities(capabilities)) + require.NoError(t, err) + require.NotNil(t, le) + + assert.Equal(t, capabilities, le.Capabilities()) } func TestLogsExporter_Default_ReturnError(t *testing.T) { ld := testdata.GenerateLogDataEmpty() want := errors.New("my_error") le, err := NewLogsExporter(&fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(want)) - require.Nil(t, err) + require.NoError(t, err) require.NotNil(t, le) require.Equal(t, want, le.ConsumeLogs(context.Background(), ld)) } func TestLogsExporter_WithRecordLogs(t *testing.T) { le, err := NewLogsExporter(&fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(nil)) - require.Nil(t, err) + require.NoError(t, err) require.NotNil(t, le) checkRecordedMetricsForLogsExporter(t, le, nil) diff --git a/exporter/exporterhelper/metricshelper.go b/exporter/exporterhelper/metrics.go similarity index 86% rename from exporter/exporterhelper/metricshelper.go rename to exporter/exporterhelper/metrics.go index 74538967c7d7..8ad834b07310 100644 --- a/exporter/exporterhelper/metricshelper.go +++ b/exporter/exporterhelper/metrics.go @@ -62,18 +62,7 @@ func (req *metricsRequest) count() int { type metricsExporter struct { *baseExporter - pusher consumerhelper.ConsumeMetricsFunc -} - -func (mexp *metricsExporter) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: false} -} - -func (mexp *metricsExporter) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { - if mexp.baseExporter.convertResourceToTelemetry { - md = convertResourceToLabels(md) - } - return mexp.sender.send(newMetricsRequest(ctx, md, mexp.pusher)) + consumer.Metrics } // NewMetricsExporter creates an MetricsExporter that records observability metrics and wraps every request with a Span. @@ -95,7 +84,8 @@ func NewMetricsExporter( return nil, errNilPushMetricsData } - be := newBaseExporter(cfg, logger, options...) + bs := fromOptions(options...) + be := newBaseExporter(cfg, logger, bs) be.wrapConsumerSender(func(nextSender requestSender) requestSender { return &metricsSenderWithObservability{ obsrep: obsreport.NewExporter(obsreport.ExporterSettings{ @@ -106,10 +96,17 @@ func NewMetricsExporter( } }) + mc, err := consumerhelper.NewMetrics(func(ctx context.Context, md pdata.Metrics) error { + if bs.ResourceToTelemetrySettings.Enabled { + md = convertResourceToLabels(md) + } + return be.sender.send(newMetricsRequest(ctx, md, pusher)) + }, bs.consumerOptions...) + return &metricsExporter{ baseExporter: be, - pusher: pusher, - }, nil + Metrics: mc, + }, err } type metricsSenderWithObservability struct { diff --git a/exporter/exporterhelper/metricshelper_test.go b/exporter/exporterhelper/metrics_test.go similarity index 83% rename from exporter/exporterhelper/metricshelper_test.go rename to exporter/exporterhelper/metrics_test.go index 1ca672450ca4..cd9ed3388bb1 100644 --- a/exporter/exporterhelper/metricshelper_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -24,7 +24,9 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumerhelper" "go.opentelemetry.io/collector/consumer/pdata" @@ -74,25 +76,36 @@ func TestMetricsExporter_NilPushMetricsData(t *testing.T) { func TestMetricsExporter_Default(t *testing.T) { md := testdata.GenerateMetricsEmpty() me, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil)) + assert.NoError(t, err) assert.NotNil(t, me) + + assert.Equal(t, consumer.Capabilities{MutatesData: false}, me.Capabilities()) + assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, me.ConsumeMetrics(context.Background(), md)) + assert.NoError(t, me.Shutdown(context.Background())) +} + +func TestMetricsExporter_WithCapabilities(t *testing.T) { + capabilities := consumer.Capabilities{MutatesData: true} + me, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil), WithCapabilities(capabilities)) assert.NoError(t, err) + assert.NotNil(t, me) - assert.Nil(t, me.ConsumeMetrics(context.Background(), md)) - assert.Nil(t, me.Shutdown(context.Background())) + assert.Equal(t, capabilities, me.Capabilities()) } func TestMetricsExporter_Default_ReturnError(t *testing.T) { md := testdata.GenerateMetricsEmpty() want := errors.New("my_error") me, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(want)) - require.Nil(t, err) + require.NoError(t, err) require.NotNil(t, me) require.Equal(t, want, me.ConsumeMetrics(context.Background(), md)) } func TestMetricsExporter_WithRecordMetrics(t *testing.T) { me, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil)) - require.Nil(t, err) + require.NoError(t, err) require.NotNil(t, me) checkRecordedMetricsForMetricsExporter(t, me, nil) @@ -101,7 +114,7 @@ func TestMetricsExporter_WithRecordMetrics(t *testing.T) { func TestMetricsExporter_WithRecordMetrics_ReturnError(t *testing.T) { want := errors.New("my_error") me, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(want)) - require.Nil(t, err) + require.NoError(t, err) require.NotNil(t, me) checkRecordedMetricsForMetricsExporter(t, me, want) @@ -109,7 +122,7 @@ func TestMetricsExporter_WithRecordMetrics_ReturnError(t *testing.T) { func TestMetricsExporter_WithSpan(t *testing.T) { me, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil)) - require.Nil(t, err) + require.NoError(t, err) require.NotNil(t, me) checkWrapSpanForMetricsExporter(t, me, nil, 1) } @@ -117,7 +130,7 @@ func TestMetricsExporter_WithSpan(t *testing.T) { func TestMetricsExporter_WithSpan_ReturnError(t *testing.T) { want := errors.New("my_error") me, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(want)) - require.Nil(t, err) + require.NoError(t, err) require.NotNil(t, me) checkWrapSpanForMetricsExporter(t, me, want, 1) } @@ -130,7 +143,8 @@ func TestMetricsExporter_WithShutdown(t *testing.T) { assert.NotNil(t, me) assert.NoError(t, err) - assert.Nil(t, me.Shutdown(context.Background())) + assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, me.Shutdown(context.Background())) assert.True(t, shutdownCalled) } @@ -140,18 +154,20 @@ func TestMetricsExporter_WithResourceToTelemetryConversionDisabled(t *testing.T) assert.NotNil(t, me) assert.NoError(t, err) - assert.Nil(t, me.ConsumeMetrics(context.Background(), md)) - assert.Nil(t, me.Shutdown(context.Background())) + assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, me.ConsumeMetrics(context.Background(), md)) + assert.NoError(t, me.Shutdown(context.Background())) } -func TestMetricsExporter_WithResourceToTelemetryConversionEbabled(t *testing.T) { +func TestMetricsExporter_WithResourceToTelemetryConversionEnabled(t *testing.T) { md := testdata.GenerateMetricsTwoMetrics() me, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil), WithResourceToTelemetryConversion(ResourceToTelemetrySettings{Enabled: true})) assert.NotNil(t, me) assert.NoError(t, err) - assert.Nil(t, me.ConsumeMetrics(context.Background(), md)) - assert.Nil(t, me.Shutdown(context.Background())) + assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, me.ConsumeMetrics(context.Background(), md)) + assert.NoError(t, me.Shutdown(context.Background())) } func TestMetricsExporter_WithShutdown_ReturnError(t *testing.T) { @@ -162,7 +178,8 @@ func TestMetricsExporter_WithShutdown_ReturnError(t *testing.T) { assert.NotNil(t, me) assert.NoError(t, err) - assert.Equal(t, me.Shutdown(context.Background()), want) + assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + assert.Equal(t, want, me.Shutdown(context.Background())) } func newPushMetricsData(retError error) consumerhelper.ConsumeMetricsFunc { diff --git a/exporter/exporterhelper/queued_retry_test.go b/exporter/exporterhelper/queued_retry_test.go index 89569429fac7..570d761dd8c2 100644 --- a/exporter/exporterhelper/queued_retry_test.go +++ b/exporter/exporterhelper/queued_retry_test.go @@ -39,7 +39,7 @@ import ( func TestQueuedRetry_DropOnPermanentError(t *testing.T) { qCfg := DefaultQueueSettings() rCfg := DefaultRetrySettings() - be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg)) + be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg))) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -63,7 +63,7 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) { qCfg := DefaultQueueSettings() rCfg := DefaultRetrySettings() rCfg.Enabled = false - be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg)) + be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg))) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -88,7 +88,7 @@ func TestQueuedRetry_OnError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := DefaultRetrySettings() rCfg.InitialInterval = 0 - be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg)) + be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg))) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -114,7 +114,7 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) { qCfg := DefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := DefaultRetrySettings() - be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg)) + be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg))) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -147,7 +147,7 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { qCfg := DefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := DefaultRetrySettings() - be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg)) + be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg))) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -176,7 +176,7 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) { rCfg := DefaultRetrySettings() rCfg.InitialInterval = time.Millisecond rCfg.MaxElapsedTime = 100 * time.Millisecond - be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg)) + be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg))) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -214,7 +214,7 @@ func TestQueuedRetry_ThrottleError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := DefaultRetrySettings() rCfg.InitialInterval = 10 * time.Millisecond - be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg)) + be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg))) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -245,7 +245,7 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { qCfg.QueueSize = 1 rCfg := DefaultRetrySettings() rCfg.InitialInterval = 0 - be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg)) + be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg))) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -271,7 +271,7 @@ func TestQueuedRetry_DropOnFull(t *testing.T) { qCfg := DefaultQueueSettings() qCfg.QueueSize = 0 rCfg := DefaultRetrySettings() - be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg)) + be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg))) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -289,7 +289,7 @@ func TestQueuedRetryHappyPath(t *testing.T) { qCfg := DefaultQueueSettings() rCfg := DefaultRetrySettings() - be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg)) + be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg))) ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -323,7 +323,7 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) { qCfg := DefaultQueueSettings() qCfg.NumConsumers = 0 // to make every request go straight to the queue rCfg := DefaultRetrySettings() - be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg)) + be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions(WithRetry(rCfg), WithQueue(qCfg))) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) for i := 0; i < 7; i++ { diff --git a/exporter/exporterhelper/tracehelper.go b/exporter/exporterhelper/traces.go similarity index 88% rename from exporter/exporterhelper/tracehelper.go rename to exporter/exporterhelper/traces.go index 81ac5e847c41..eae4b16c5dae 100644 --- a/exporter/exporterhelper/tracehelper.go +++ b/exporter/exporterhelper/traces.go @@ -61,15 +61,7 @@ func (req *tracesRequest) count() int { type traceExporter struct { *baseExporter - pusher consumerhelper.ConsumeTracesFunc -} - -func (texp *traceExporter) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: false} -} - -func (texp *traceExporter) ConsumeTraces(ctx context.Context, td pdata.Traces) error { - return texp.sender.send(newTracesRequest(ctx, td, texp.pusher)) + consumer.Traces } // NewTracesExporter creates a TracesExporter that records observability metrics and wraps every request with a Span. @@ -92,7 +84,8 @@ func NewTracesExporter( return nil, errNilPushTraceData } - be := newBaseExporter(cfg, logger, options...) + bs := fromOptions(options...) + be := newBaseExporter(cfg, logger, bs) be.wrapConsumerSender(func(nextSender requestSender) requestSender { return &tracesExporterWithObservability{ obsrep: obsreport.NewExporter( @@ -104,10 +97,14 @@ func NewTracesExporter( } }) + tc, err := consumerhelper.NewTraces(func(ctx context.Context, td pdata.Traces) error { + return be.sender.send(newTracesRequest(ctx, td, pusher)) + }, bs.consumerOptions...) + return &traceExporter{ baseExporter: be, - pusher: pusher, - }, nil + Traces: tc, + }, err } type tracesExporterWithObservability struct { diff --git a/exporter/exporterhelper/tracehelper_test.go b/exporter/exporterhelper/traces_test.go similarity index 87% rename from exporter/exporterhelper/tracehelper_test.go rename to exporter/exporterhelper/traces_test.go index 2b2b9204d96b..d56d2e861cdf 100644 --- a/exporter/exporterhelper/tracehelper_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -25,7 +25,9 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumerhelper" "go.opentelemetry.io/collector/consumer/pdata" @@ -86,15 +88,26 @@ func TestTracesExporter_Default(t *testing.T) { assert.NotNil(t, te) assert.NoError(t, err) - assert.Nil(t, te.ConsumeTraces(context.Background(), td)) - assert.Nil(t, te.Shutdown(context.Background())) + assert.Equal(t, consumer.Capabilities{MutatesData: false}, te.Capabilities()) + assert.NoError(t, te.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, te.ConsumeTraces(context.Background(), td)) + assert.NoError(t, te.Shutdown(context.Background())) +} + +func TestTracesExporter_WithCapabilities(t *testing.T) { + capabilities := consumer.Capabilities{MutatesData: true} + te, err := NewTracesExporter(&fakeTracesExporterConfig, zap.NewNop(), newTraceDataPusher(nil), WithCapabilities(capabilities)) + assert.NotNil(t, te) + assert.NoError(t, err) + + assert.Equal(t, capabilities, te.Capabilities()) } func TestTracesExporter_Default_ReturnError(t *testing.T) { td := pdata.NewTraces() want := errors.New("my_error") te, err := NewTracesExporter(&fakeTracesExporterConfig, zap.NewNop(), newTraceDataPusher(want)) - require.Nil(t, err) + require.NoError(t, err) require.NotNil(t, te) err = te.ConsumeTraces(context.Background(), td) @@ -103,7 +116,7 @@ func TestTracesExporter_Default_ReturnError(t *testing.T) { func TestTracesExporter_WithRecordMetrics(t *testing.T) { te, err := NewTracesExporter(&fakeTracesExporterConfig, zap.NewNop(), newTraceDataPusher(nil)) - require.Nil(t, err) + require.NoError(t, err) require.NotNil(t, te) checkRecordedMetricsForTracesExporter(t, te, nil) @@ -112,7 +125,7 @@ func TestTracesExporter_WithRecordMetrics(t *testing.T) { func TestTracesExporter_WithRecordMetrics_ReturnError(t *testing.T) { want := errors.New("my_error") te, err := NewTracesExporter(&fakeTracesExporterConfig, zap.NewNop(), newTraceDataPusher(want)) - require.Nil(t, err) + require.NoError(t, err) require.NotNil(t, te) checkRecordedMetricsForTracesExporter(t, te, want) @@ -120,7 +133,7 @@ func TestTracesExporter_WithRecordMetrics_ReturnError(t *testing.T) { func TestTracesExporter_WithSpan(t *testing.T) { te, err := NewTracesExporter(&fakeTracesExporterConfig, zap.NewNop(), newTraceDataPusher(nil)) - require.Nil(t, err) + require.NoError(t, err) require.NotNil(t, te) checkWrapSpanForTracesExporter(t, te, nil, 1) @@ -129,7 +142,7 @@ func TestTracesExporter_WithSpan(t *testing.T) { func TestTracesExporter_WithSpan_ReturnError(t *testing.T) { want := errors.New("my_error") te, err := NewTracesExporter(&fakeTracesExporterConfig, zap.NewNop(), newTraceDataPusher(want)) - require.Nil(t, err) + require.NoError(t, err) require.NotNil(t, te) checkWrapSpanForTracesExporter(t, te, want, 1) @@ -143,7 +156,8 @@ func TestTracesExporter_WithShutdown(t *testing.T) { assert.NotNil(t, te) assert.NoError(t, err) - assert.Nil(t, te.Shutdown(context.Background())) + assert.NoError(t, te.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, te.Shutdown(context.Background())) assert.True(t, shutdownCalled) } @@ -155,6 +169,7 @@ func TestTracesExporter_WithShutdown_ReturnError(t *testing.T) { assert.NotNil(t, te) assert.NoError(t, err) + assert.NoError(t, te.Start(context.Background(), componenttest.NewNopHost())) assert.Equal(t, te.Shutdown(context.Background()), want) }