Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] move kafka exporter to generated lifecycle tests #30531

Merged
merged 3 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions exporter/kafkaexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func (f *kafkaExporterFactory) createTracesExporter(
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(oCfg.BackOffConfig),
exporterhelper.WithQueue(oCfg.QueueSettings),
exporterhelper.WithStart(exp.start),
exporterhelper.WithShutdown(exp.Close))
}

Expand Down Expand Up @@ -178,6 +179,7 @@ func (f *kafkaExporterFactory) createMetricsExporter(
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(oCfg.BackOffConfig),
exporterhelper.WithQueue(oCfg.QueueSettings),
exporterhelper.WithStart(exp.start),
exporterhelper.WithShutdown(exp.Close))
}

Expand Down Expand Up @@ -208,5 +210,6 @@ func (f *kafkaExporterFactory) createLogsExporter(
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(oCfg.BackOffConfig),
exporterhelper.WithQueue(oCfg.QueueSettings),
exporterhelper.WithStart(exp.start),
exporterhelper.WithShutdown(exp.Close))
}
13 changes: 10 additions & 3 deletions exporter/kafkaexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/IBM/sarama"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/plog"
Expand Down Expand Up @@ -124,9 +125,11 @@ func TestCreateMetricExporter(t *testing.T) {
exportertest.NewNopCreateSettings(),
tc.conf,
)
require.NoError(t, err)
assert.NotNil(t, exporter, "Must return valid exporter")
err = exporter.Start(context.Background(), componenttest.NewNopHost())
if tc.err != nil {
assert.ErrorAs(t, err, &tc.err, "Must match the expected error")
assert.Nil(t, exporter, "Must return nil value for invalid exporter")
return
}
assert.NoError(t, err, "Must not error")
Expand Down Expand Up @@ -199,9 +202,11 @@ func TestCreateLogExporter(t *testing.T) {
exportertest.NewNopCreateSettings(),
tc.conf,
)
require.NoError(t, err)
assert.NotNil(t, exporter, "Must return valid exporter")
err = exporter.Start(context.Background(), componenttest.NewNopHost())
if tc.err != nil {
assert.ErrorAs(t, err, &tc.err, "Must match the expected error")
assert.Nil(t, exporter, "Must return nil value for invalid exporter")
return
}
assert.NoError(t, err, "Must not error")
Expand Down Expand Up @@ -274,9 +279,11 @@ func TestCreateTraceExporter(t *testing.T) {
exportertest.NewNopCreateSettings(),
tc.conf,
)
require.NoError(t, err)
assert.NotNil(t, exporter, "Must return valid exporter")
err = exporter.Start(context.Background(), componenttest.NewNopHost())
if tc.err != nil {
assert.ErrorAs(t, err, &tc.err, "Must match the expected error")
assert.Nil(t, exporter, "Must return nil value for invalid exporter")
return
}
assert.NoError(t, err, "Must not error")
Expand Down
101 changes: 101 additions & 0 deletions exporter/kafkaexporter/generated_component_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 44 additions & 16 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"

"github.com/IBM/sarama"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/plog"
Expand All @@ -23,6 +24,7 @@ var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding")

// kafkaTracesProducer uses sarama to produce trace messages to Kafka.
type kafkaTracesProducer struct {
cfg Config
producer sarama.SyncProducer
topic string
marshaler TracesMarshaler
Expand Down Expand Up @@ -57,11 +59,24 @@ func (e *kafkaTracesProducer) tracesPusher(_ context.Context, td ptrace.Traces)
}

func (e *kafkaTracesProducer) Close(context.Context) error {
if e.producer == nil {
return nil
}
return e.producer.Close()
}

func (e *kafkaTracesProducer) start(_ context.Context, _ component.Host) error {
producer, err := newSaramaProducer(e.cfg)
atoulme marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
e.producer = producer
return nil
}

// kafkaMetricsProducer uses sarama to produce metrics messages to kafka
type kafkaMetricsProducer struct {
cfg Config
producer sarama.SyncProducer
topic string
marshaler MetricsMarshaler
Expand All @@ -87,11 +102,24 @@ func (e *kafkaMetricsProducer) metricsDataPusher(_ context.Context, md pmetric.M
}

func (e *kafkaMetricsProducer) Close(context.Context) error {
if e.producer == nil {
return nil
}
return e.producer.Close()
}

func (e *kafkaMetricsProducer) start(_ context.Context, _ component.Host) error {
producer, err := newSaramaProducer(e.cfg)
if err != nil {
return err
}
e.producer = producer
return nil
}

// kafkaLogsProducer uses sarama to produce logs messages to kafka
type kafkaLogsProducer struct {
cfg Config
producer sarama.SyncProducer
topic string
marshaler LogsMarshaler
Expand All @@ -117,9 +145,21 @@ func (e *kafkaLogsProducer) logsDataPusher(_ context.Context, ld plog.Logs) erro
}

func (e *kafkaLogsProducer) Close(context.Context) error {
if e.producer == nil {
return nil
}
return e.producer.Close()
}

func (e *kafkaLogsProducer) start(_ context.Context, _ component.Host) error {
producer, err := newSaramaProducer(e.cfg)
if err != nil {
return err
}
e.producer = producer
return nil
}

func newSaramaProducer(config Config) (sarama.SyncProducer, error) {
c := sarama.NewConfig()

Expand Down Expand Up @@ -171,13 +211,8 @@ func newMetricsExporter(config Config, set exporter.CreateSettings, marshalers m
if marshaler == nil {
return nil, errUnrecognizedEncoding
}
producer, err := newSaramaProducer(config)
if err != nil {
return nil, err
}

return &kafkaMetricsProducer{
producer: producer,
cfg: config,
topic: config.Topic,
marshaler: marshaler,
logger: set.Logger,
Expand All @@ -196,12 +231,9 @@ func newTracesExporter(config Config, set exporter.CreateSettings, marshalers ma
keyableMarshaler.Key()
}
}
producer, err := newSaramaProducer(config)
if err != nil {
return nil, err
}

return &kafkaTracesProducer{
producer: producer,
cfg: config,
topic: config.Topic,
marshaler: marshaler,
logger: set.Logger,
Expand All @@ -213,13 +245,9 @@ func newLogsExporter(config Config, set exporter.CreateSettings, marshalers map[
if marshaler == nil {
return nil, errUnrecognizedEncoding
}
producer, err := newSaramaProducer(config)
if err != nil {
return nil, err
}

return &kafkaLogsProducer{
producer: producer,
cfg: config,
topic: config.Topic,
marshaler: marshaler,
logger: set.Logger,
Expand Down
24 changes: 16 additions & 8 deletions exporter/kafkaexporter/kafka_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/IBM/sarama/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/plog"
Expand All @@ -26,8 +27,9 @@ import (
func TestNewExporter_err_version(t *testing.T) {
c := Config{ProtocolVersion: "0.0.0", Encoding: defaultEncoding}
texp, err := newTracesExporter(c, exportertest.NewNopCreateSettings(), tracesMarshalers())
require.NoError(t, err)
err = texp.start(context.Background(), componenttest.NewNopHost())
assert.Error(t, err)
assert.Nil(t, texp)
}

func TestNewExporter_err_encoding(t *testing.T) {
Expand All @@ -40,8 +42,9 @@ func TestNewExporter_err_encoding(t *testing.T) {
func TestNewMetricsExporter_err_version(t *testing.T) {
c := Config{ProtocolVersion: "0.0.0", Encoding: defaultEncoding}
mexp, err := newMetricsExporter(c, exportertest.NewNopCreateSettings(), metricsMarshalers())
require.NoError(t, err)
err = mexp.start(context.Background(), componenttest.NewNopHost())
assert.Error(t, err)
assert.Nil(t, mexp)
}

func TestNewMetricsExporter_err_encoding(t *testing.T) {
Expand All @@ -60,9 +63,10 @@ func TestNewMetricsExporter_err_traces_encoding(t *testing.T) {

func TestNewLogsExporter_err_version(t *testing.T) {
c := Config{ProtocolVersion: "0.0.0", Encoding: defaultEncoding}
mexp, err := newLogsExporter(c, exportertest.NewNopCreateSettings(), logsMarshalers())
lexp, err := newLogsExporter(c, exportertest.NewNopCreateSettings(), logsMarshalers())
require.NoError(t, err)
err = lexp.start(context.Background(), componenttest.NewNopHost())
assert.Error(t, err)
assert.Nil(t, mexp)
}

func TestNewLogsExporter_err_encoding(t *testing.T) {
Expand Down Expand Up @@ -98,17 +102,20 @@ func TestNewExporter_err_auth_type(t *testing.T) {
},
}
texp, err := newTracesExporter(c, exportertest.NewNopCreateSettings(), tracesMarshalers())
require.NoError(t, err)
err = texp.start(context.Background(), componenttest.NewNopHost())
assert.Error(t, err)
assert.Contains(t, err.Error(), "failed to load TLS config")
assert.Nil(t, texp)
mexp, err := newMetricsExporter(c, exportertest.NewNopCreateSettings(), metricsMarshalers())
require.NoError(t, err)
err = mexp.start(context.Background(), componenttest.NewNopHost())
assert.Error(t, err)
assert.Contains(t, err.Error(), "failed to load TLS config")
assert.Nil(t, mexp)
lexp, err := newLogsExporter(c, exportertest.NewNopCreateSettings(), logsMarshalers())
require.NoError(t, err)
err = lexp.start(context.Background(), componenttest.NewNopHost())
assert.Error(t, err)
assert.Contains(t, err.Error(), "failed to load TLS config")
assert.Nil(t, lexp)

}

Expand All @@ -120,9 +127,10 @@ func TestNewExporter_err_compression(t *testing.T) {
},
}
texp, err := newTracesExporter(c, exportertest.NewNopCreateSettings(), tracesMarshalers())
require.NoError(t, err)
err = texp.start(context.Background(), componenttest.NewNopHost())
assert.Error(t, err)
assert.Contains(t, err.Error(), "producer.compression should be one of 'none', 'gzip', 'snappy', 'lz4', or 'zstd'. configured value idk")
assert.Nil(t, texp)
}

func TestTracesPusher(t *testing.T) {
Expand Down
3 changes: 1 addition & 2 deletions exporter/kafkaexporter/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ status:
codeowners:
active: [pavolloffay, MovieStoreGuy]

# TODO: Update the exporter to pass the tests
tests:
config:
skip_lifecycle: true
skip_shutdown: true
Loading