Skip to content

Commit

Permalink
Use consumerhelper for exporterhelper, add WithCapabilities (#3186)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored May 14, 2021
1 parent a96e010 commit 52abb90
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 91 deletions.
27 changes: 17 additions & 10 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenthelper"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerhelper"
)

// TimeoutSettings for timeout. The timeout applies to individual attempts to send data to the backend.
Expand Down Expand Up @@ -73,14 +75,15 @@ func (req *baseRequest) setContext(ctx context.Context) {
// baseSettings represents all the options that users can configure.
type baseSettings struct {
componentOptions []componenthelper.Option
consumerOptions []consumerhelper.Option
TimeoutSettings
QueueSettings
RetrySettings
ResourceToTelemetrySettings
}

// fromOptions returns the internal options starting from the default and applying all configured options.
func fromOptions(options []Option) *baseSettings {
func fromOptions(options ...Option) *baseSettings {
// Start from the default options:
opts := &baseSettings{
TimeoutSettings: DefaultTimeoutSettings(),
Expand Down Expand Up @@ -141,6 +144,15 @@ func WithQueue(queueSettings QueueSettings) Option {
}
}

// WithCapabilities overrides the default Capabilities() function for a Consumer.
// The default is non-mutable data.
// TODO: Verify if we can change the default to be mutable as we do for processors.
func WithCapabilities(capabilities consumer.Capabilities) Option {
return func(o *baseSettings) {
o.consumerOptions = append(o.consumerOptions, consumerhelper.WithCapabilities(capabilities))
}
}

// WithResourceToTelemetryConversion overrides the default ResourceToTelemetrySettings for an exporter.
// The default ResourceToTelemetrySettings is to disable resource attributes to metric labels conversion.
func WithResourceToTelemetryConversion(resourceToTelemetrySettings ResourceToTelemetrySettings) Option {
Expand All @@ -152,18 +164,13 @@ func WithResourceToTelemetryConversion(resourceToTelemetrySettings ResourceToTel
// baseExporter contains common fields between different exporter types.
type baseExporter struct {
component.Component
cfg config.Exporter
sender requestSender
qrSender *queuedRetrySender
convertResourceToTelemetry bool
sender requestSender
qrSender *queuedRetrySender
}

func newBaseExporter(cfg config.Exporter, logger *zap.Logger, options ...Option) *baseExporter {
bs := fromOptions(options)
func newBaseExporter(cfg config.Exporter, logger *zap.Logger, bs *baseSettings) *baseExporter {
be := &baseExporter{
Component: componenthelper.New(bs.componentOptions...),
cfg: cfg,
convertResourceToTelemetry: bs.ResourceToTelemetrySettings.Enabled,
Component: componenthelper.New(bs.componentOptions...),
}

be.qrSender = newQueuedRetrySender(cfg.ID().String(), bs.QueueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, logger)
Expand Down
11 changes: 6 additions & 5 deletions exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestErrorToStatus(t *testing.T) {
}

func TestBaseExporter(t *testing.T) {
be := newBaseExporter(&defaultExporterCfg, zap.NewNop())
be := newBaseExporter(&defaultExporterCfg, zap.NewNop(), fromOptions())
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
}
Expand All @@ -54,10 +54,11 @@ func TestBaseExporterWithOptions(t *testing.T) {
be := newBaseExporter(
&defaultExporterCfg,
zap.NewNop(),
WithStart(func(ctx context.Context, host component.Host) error { return want }),
WithShutdown(func(ctx context.Context) error { return want }),
WithResourceToTelemetryConversion(defaultResourceToTelemetrySettings()),
WithTimeout(DefaultTimeoutSettings()),
fromOptions(
WithStart(func(ctx context.Context, host component.Host) error { return want }),
WithShutdown(func(ctx context.Context) error { return want }),
WithResourceToTelemetryConversion(defaultResourceToTelemetrySettings()),
WithTimeout(DefaultTimeoutSettings())),
)
require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost()))
require.Equal(t, want, be.Shutdown(context.Background()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,7 @@ func (req *logsRequest) count() int {

type logsExporter struct {
*baseExporter
pusher consumerhelper.ConsumeLogsFunc
}

func (lexp *logsExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (lexp *logsExporter) ConsumeLogs(ctx context.Context, ld pdata.Logs) error {
return lexp.sender.send(newLogsRequest(ctx, ld, lexp.pusher))
consumer.Logs
}

// NewLogsExporter creates an LogsExporter that records observability metrics and wraps every request with a Span.
Expand All @@ -91,7 +83,8 @@ func NewLogsExporter(
return nil, errNilPushLogsData
}

be := newBaseExporter(cfg, logger, options...)
bs := fromOptions(options...)
be := newBaseExporter(cfg, logger, bs)
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
return &logsExporterWithObservability{
obsrep: obsreport.NewExporter(obsreport.ExporterSettings{
Expand All @@ -102,10 +95,14 @@ func NewLogsExporter(
}
})

lc, err := consumerhelper.NewLogs(func(ctx context.Context, ld pdata.Logs) error {
return be.sender.send(newLogsRequest(ctx, ld, pusher))
}, bs.consumerOptions...)

return &logsExporter{
baseExporter: be,
pusher: pusher,
}, nil
Logs: lc,
}, err
}

type logsExporterWithObservability struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumerhelper"
"go.opentelemetry.io/collector/consumer/pdata"
Expand Down Expand Up @@ -78,22 +80,33 @@ func TestLogsExporter_Default(t *testing.T) {
assert.NotNil(t, le)
assert.NoError(t, err)

assert.Nil(t, le.ConsumeLogs(context.Background(), ld))
assert.Nil(t, le.Shutdown(context.Background()))
assert.Equal(t, consumer.Capabilities{MutatesData: false}, le.Capabilities())
assert.NoError(t, le.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, le.ConsumeLogs(context.Background(), ld))
assert.NoError(t, le.Shutdown(context.Background()))
}

func TestLogsExporter_WithCapabilities(t *testing.T) {
capabilities := consumer.Capabilities{MutatesData: true}
le, err := NewLogsExporter(&fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(nil), WithCapabilities(capabilities))
require.NoError(t, err)
require.NotNil(t, le)

assert.Equal(t, capabilities, le.Capabilities())
}

func TestLogsExporter_Default_ReturnError(t *testing.T) {
ld := testdata.GenerateLogDataEmpty()
want := errors.New("my_error")
le, err := NewLogsExporter(&fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(want))
require.Nil(t, err)
require.NoError(t, err)
require.NotNil(t, le)
require.Equal(t, want, le.ConsumeLogs(context.Background(), ld))
}

func TestLogsExporter_WithRecordLogs(t *testing.T) {
le, err := NewLogsExporter(&fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(nil))
require.Nil(t, err)
require.NoError(t, err)
require.NotNil(t, le)

checkRecordedMetricsForLogsExporter(t, le, nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,7 @@ func (req *metricsRequest) count() int {

type metricsExporter struct {
*baseExporter
pusher consumerhelper.ConsumeMetricsFunc
}

func (mexp *metricsExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (mexp *metricsExporter) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
if mexp.baseExporter.convertResourceToTelemetry {
md = convertResourceToLabels(md)
}
return mexp.sender.send(newMetricsRequest(ctx, md, mexp.pusher))
consumer.Metrics
}

// NewMetricsExporter creates an MetricsExporter that records observability metrics and wraps every request with a Span.
Expand All @@ -95,7 +84,8 @@ func NewMetricsExporter(
return nil, errNilPushMetricsData
}

be := newBaseExporter(cfg, logger, options...)
bs := fromOptions(options...)
be := newBaseExporter(cfg, logger, bs)
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
return &metricsSenderWithObservability{
obsrep: obsreport.NewExporter(obsreport.ExporterSettings{
Expand All @@ -106,10 +96,17 @@ func NewMetricsExporter(
}
})

mc, err := consumerhelper.NewMetrics(func(ctx context.Context, md pdata.Metrics) error {
if bs.ResourceToTelemetrySettings.Enabled {
md = convertResourceToLabels(md)
}
return be.sender.send(newMetricsRequest(ctx, md, pusher))
}, bs.consumerOptions...)

return &metricsExporter{
baseExporter: be,
pusher: pusher,
}, nil
Metrics: mc,
}, err
}

type metricsSenderWithObservability struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumerhelper"
"go.opentelemetry.io/collector/consumer/pdata"
Expand Down Expand Up @@ -74,25 +76,36 @@ func TestMetricsExporter_NilPushMetricsData(t *testing.T) {
func TestMetricsExporter_Default(t *testing.T) {
md := testdata.GenerateMetricsEmpty()
me, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil))
assert.NoError(t, err)
assert.NotNil(t, me)

assert.Equal(t, consumer.Capabilities{MutatesData: false}, me.Capabilities())
assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, me.ConsumeMetrics(context.Background(), md))
assert.NoError(t, me.Shutdown(context.Background()))
}

func TestMetricsExporter_WithCapabilities(t *testing.T) {
capabilities := consumer.Capabilities{MutatesData: true}
me, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil), WithCapabilities(capabilities))
assert.NoError(t, err)
assert.NotNil(t, me)

assert.Nil(t, me.ConsumeMetrics(context.Background(), md))
assert.Nil(t, me.Shutdown(context.Background()))
assert.Equal(t, capabilities, me.Capabilities())
}

func TestMetricsExporter_Default_ReturnError(t *testing.T) {
md := testdata.GenerateMetricsEmpty()
want := errors.New("my_error")
me, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(want))
require.Nil(t, err)
require.NoError(t, err)
require.NotNil(t, me)
require.Equal(t, want, me.ConsumeMetrics(context.Background(), md))
}

func TestMetricsExporter_WithRecordMetrics(t *testing.T) {
me, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil))
require.Nil(t, err)
require.NoError(t, err)
require.NotNil(t, me)

checkRecordedMetricsForMetricsExporter(t, me, nil)
Expand All @@ -101,23 +114,23 @@ func TestMetricsExporter_WithRecordMetrics(t *testing.T) {
func TestMetricsExporter_WithRecordMetrics_ReturnError(t *testing.T) {
want := errors.New("my_error")
me, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(want))
require.Nil(t, err)
require.NoError(t, err)
require.NotNil(t, me)

checkRecordedMetricsForMetricsExporter(t, me, want)
}

func TestMetricsExporter_WithSpan(t *testing.T) {
me, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil))
require.Nil(t, err)
require.NoError(t, err)
require.NotNil(t, me)
checkWrapSpanForMetricsExporter(t, me, nil, 1)
}

func TestMetricsExporter_WithSpan_ReturnError(t *testing.T) {
want := errors.New("my_error")
me, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(want))
require.Nil(t, err)
require.NoError(t, err)
require.NotNil(t, me)
checkWrapSpanForMetricsExporter(t, me, want, 1)
}
Expand All @@ -130,7 +143,8 @@ func TestMetricsExporter_WithShutdown(t *testing.T) {
assert.NotNil(t, me)
assert.NoError(t, err)

assert.Nil(t, me.Shutdown(context.Background()))
assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, me.Shutdown(context.Background()))
assert.True(t, shutdownCalled)
}

Expand All @@ -140,18 +154,20 @@ func TestMetricsExporter_WithResourceToTelemetryConversionDisabled(t *testing.T)
assert.NotNil(t, me)
assert.NoError(t, err)

assert.Nil(t, me.ConsumeMetrics(context.Background(), md))
assert.Nil(t, me.Shutdown(context.Background()))
assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, me.ConsumeMetrics(context.Background(), md))
assert.NoError(t, me.Shutdown(context.Background()))
}

func TestMetricsExporter_WithResourceToTelemetryConversionEbabled(t *testing.T) {
func TestMetricsExporter_WithResourceToTelemetryConversionEnabled(t *testing.T) {
md := testdata.GenerateMetricsTwoMetrics()
me, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil), WithResourceToTelemetryConversion(ResourceToTelemetrySettings{Enabled: true}))
assert.NotNil(t, me)
assert.NoError(t, err)

assert.Nil(t, me.ConsumeMetrics(context.Background(), md))
assert.Nil(t, me.Shutdown(context.Background()))
assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, me.ConsumeMetrics(context.Background(), md))
assert.NoError(t, me.Shutdown(context.Background()))
}

func TestMetricsExporter_WithShutdown_ReturnError(t *testing.T) {
Expand All @@ -162,7 +178,8 @@ func TestMetricsExporter_WithShutdown_ReturnError(t *testing.T) {
assert.NotNil(t, me)
assert.NoError(t, err)

assert.Equal(t, me.Shutdown(context.Background()), want)
assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost()))
assert.Equal(t, want, me.Shutdown(context.Background()))
}

func newPushMetricsData(retError error) consumerhelper.ConsumeMetricsFunc {
Expand Down
Loading

0 comments on commit 52abb90

Please sign in to comment.