diff --git a/exporter/kafkaexporter/factory.go b/exporter/kafkaexporter/factory.go index 35ba89d41e15..e822b0a005f4 100644 --- a/exporter/kafkaexporter/factory.go +++ b/exporter/kafkaexporter/factory.go @@ -148,6 +148,7 @@ func (f *kafkaExporterFactory) createTracesExporter( exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), exporterhelper.WithRetry(oCfg.BackOffConfig), exporterhelper.WithQueue(oCfg.QueueSettings), + exporterhelper.WithStart(exp.start), exporterhelper.WithShutdown(exp.Close)) } @@ -178,6 +179,7 @@ func (f *kafkaExporterFactory) createMetricsExporter( exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), exporterhelper.WithRetry(oCfg.BackOffConfig), exporterhelper.WithQueue(oCfg.QueueSettings), + exporterhelper.WithStart(exp.start), exporterhelper.WithShutdown(exp.Close)) } @@ -208,5 +210,6 @@ func (f *kafkaExporterFactory) createLogsExporter( exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), exporterhelper.WithRetry(oCfg.BackOffConfig), exporterhelper.WithQueue(oCfg.QueueSettings), + exporterhelper.WithStart(exp.start), exporterhelper.WithShutdown(exp.Close)) } diff --git a/exporter/kafkaexporter/factory_test.go b/exporter/kafkaexporter/factory_test.go index 7f83a4c33857..6760029478df 100644 --- a/exporter/kafkaexporter/factory_test.go +++ b/exporter/kafkaexporter/factory_test.go @@ -11,6 +11,7 @@ import ( "github.com/IBM/sarama" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/pdata/plog" @@ -124,9 +125,11 @@ func TestCreateMetricExporter(t *testing.T) { exportertest.NewNopCreateSettings(), tc.conf, ) + require.NoError(t, err) + assert.NotNil(t, exporter, "Must return valid exporter") + err = exporter.Start(context.Background(), componenttest.NewNopHost()) if tc.err != nil { assert.ErrorAs(t, err, &tc.err, "Must match the expected error") - assert.Nil(t, exporter, "Must return nil value for invalid exporter") return } assert.NoError(t, err, "Must not error") @@ -199,9 +202,11 @@ func TestCreateLogExporter(t *testing.T) { exportertest.NewNopCreateSettings(), tc.conf, ) + require.NoError(t, err) + assert.NotNil(t, exporter, "Must return valid exporter") + err = exporter.Start(context.Background(), componenttest.NewNopHost()) if tc.err != nil { assert.ErrorAs(t, err, &tc.err, "Must match the expected error") - assert.Nil(t, exporter, "Must return nil value for invalid exporter") return } assert.NoError(t, err, "Must not error") @@ -274,9 +279,11 @@ func TestCreateTraceExporter(t *testing.T) { exportertest.NewNopCreateSettings(), tc.conf, ) + require.NoError(t, err) + assert.NotNil(t, exporter, "Must return valid exporter") + err = exporter.Start(context.Background(), componenttest.NewNopHost()) if tc.err != nil { assert.ErrorAs(t, err, &tc.err, "Must match the expected error") - assert.Nil(t, exporter, "Must return nil value for invalid exporter") return } assert.NoError(t, err, "Must not error") diff --git a/exporter/kafkaexporter/generated_component_test.go b/exporter/kafkaexporter/generated_component_test.go new file mode 100644 index 000000000000..06bd42403b25 --- /dev/null +++ b/exporter/kafkaexporter/generated_component_test.go @@ -0,0 +1,101 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package kafkaexporter + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap/confmaptest" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +func TestComponentLifecycle(t *testing.T) { + factory := NewFactory() + + tests := []struct { + name string + createFn func(ctx context.Context, set exporter.CreateSettings, cfg component.Config) (component.Component, error) + }{ + + { + name: "logs", + createFn: func(ctx context.Context, set exporter.CreateSettings, cfg component.Config) (component.Component, error) { + return factory.CreateLogsExporter(ctx, set, cfg) + }, + }, + + { + name: "metrics", + createFn: func(ctx context.Context, set exporter.CreateSettings, cfg component.Config) (component.Component, error) { + return factory.CreateMetricsExporter(ctx, set, cfg) + }, + }, + + { + name: "traces", + createFn: func(ctx context.Context, set exporter.CreateSettings, cfg component.Config) (component.Component, error) { + return factory.CreateTracesExporter(ctx, set, cfg) + }, + }, + } + + cm, err := confmaptest.LoadConf("metadata.yaml") + require.NoError(t, err) + cfg := factory.CreateDefaultConfig() + sub, err := cm.Sub("tests::config") + require.NoError(t, err) + require.NoError(t, component.UnmarshalConfig(sub, cfg)) + + for _, test := range tests { + t.Run(test.name+"-shutdown", func(t *testing.T) { + c, err := test.createFn(context.Background(), exportertest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + err = c.Shutdown(context.Background()) + require.NoError(t, err) + }) + } +} + +func generateLifecycleTestLogs() plog.Logs { + logs := plog.NewLogs() + rl := logs.ResourceLogs().AppendEmpty() + rl.Resource().Attributes().PutStr("resource", "R1") + l := rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + l.Body().SetStr("test log message") + l.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + return logs +} + +func generateLifecycleTestMetrics() pmetric.Metrics { + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + rm.Resource().Attributes().PutStr("resource", "R1") + m := rm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + m.SetName("test_metric") + dp := m.SetEmptyGauge().DataPoints().AppendEmpty() + dp.Attributes().PutStr("test_attr", "value_1") + dp.SetIntValue(123) + dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + return metrics +} + +func generateLifecycleTestTraces() ptrace.Traces { + traces := ptrace.NewTraces() + rs := traces.ResourceSpans().AppendEmpty() + rs.Resource().Attributes().PutStr("resource", "R1") + span := rs.ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.Attributes().PutStr("test_attr", "value_1") + span.SetName("test_span") + span.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(-1 * time.Second))) + span.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Now())) + return traces +} diff --git a/exporter/kafkaexporter/kafka_exporter.go b/exporter/kafkaexporter/kafka_exporter.go index 47417f5ee4c9..c98abdc674c0 100644 --- a/exporter/kafkaexporter/kafka_exporter.go +++ b/exporter/kafkaexporter/kafka_exporter.go @@ -9,6 +9,7 @@ import ( "fmt" "github.com/IBM/sarama" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/pdata/plog" @@ -23,6 +24,7 @@ var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding") // kafkaTracesProducer uses sarama to produce trace messages to Kafka. type kafkaTracesProducer struct { + cfg Config producer sarama.SyncProducer topic string marshaler TracesMarshaler @@ -57,11 +59,24 @@ func (e *kafkaTracesProducer) tracesPusher(_ context.Context, td ptrace.Traces) } func (e *kafkaTracesProducer) Close(context.Context) error { + if e.producer == nil { + return nil + } return e.producer.Close() } +func (e *kafkaTracesProducer) start(_ context.Context, _ component.Host) error { + producer, err := newSaramaProducer(e.cfg) + if err != nil { + return err + } + e.producer = producer + return nil +} + // kafkaMetricsProducer uses sarama to produce metrics messages to kafka type kafkaMetricsProducer struct { + cfg Config producer sarama.SyncProducer topic string marshaler MetricsMarshaler @@ -87,11 +102,24 @@ func (e *kafkaMetricsProducer) metricsDataPusher(_ context.Context, md pmetric.M } func (e *kafkaMetricsProducer) Close(context.Context) error { + if e.producer == nil { + return nil + } return e.producer.Close() } +func (e *kafkaMetricsProducer) start(_ context.Context, _ component.Host) error { + producer, err := newSaramaProducer(e.cfg) + if err != nil { + return err + } + e.producer = producer + return nil +} + // kafkaLogsProducer uses sarama to produce logs messages to kafka type kafkaLogsProducer struct { + cfg Config producer sarama.SyncProducer topic string marshaler LogsMarshaler @@ -117,9 +145,21 @@ func (e *kafkaLogsProducer) logsDataPusher(_ context.Context, ld plog.Logs) erro } func (e *kafkaLogsProducer) Close(context.Context) error { + if e.producer == nil { + return nil + } return e.producer.Close() } +func (e *kafkaLogsProducer) start(_ context.Context, _ component.Host) error { + producer, err := newSaramaProducer(e.cfg) + if err != nil { + return err + } + e.producer = producer + return nil +} + func newSaramaProducer(config Config) (sarama.SyncProducer, error) { c := sarama.NewConfig() @@ -171,13 +211,8 @@ func newMetricsExporter(config Config, set exporter.CreateSettings, marshalers m if marshaler == nil { return nil, errUnrecognizedEncoding } - producer, err := newSaramaProducer(config) - if err != nil { - return nil, err - } - return &kafkaMetricsProducer{ - producer: producer, + cfg: config, topic: config.Topic, marshaler: marshaler, logger: set.Logger, @@ -196,12 +231,9 @@ func newTracesExporter(config Config, set exporter.CreateSettings, marshalers ma keyableMarshaler.Key() } } - producer, err := newSaramaProducer(config) - if err != nil { - return nil, err - } + return &kafkaTracesProducer{ - producer: producer, + cfg: config, topic: config.Topic, marshaler: marshaler, logger: set.Logger, @@ -213,13 +245,9 @@ func newLogsExporter(config Config, set exporter.CreateSettings, marshalers map[ if marshaler == nil { return nil, errUnrecognizedEncoding } - producer, err := newSaramaProducer(config) - if err != nil { - return nil, err - } return &kafkaLogsProducer{ - producer: producer, + cfg: config, topic: config.Topic, marshaler: marshaler, logger: set.Logger, diff --git a/exporter/kafkaexporter/kafka_exporter_test.go b/exporter/kafkaexporter/kafka_exporter_test.go index ab8956ca5e2c..ef91f0892d1e 100644 --- a/exporter/kafkaexporter/kafka_exporter_test.go +++ b/exporter/kafkaexporter/kafka_exporter_test.go @@ -12,6 +12,7 @@ import ( "github.com/IBM/sarama/mocks" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/pdata/plog" @@ -26,8 +27,9 @@ import ( func TestNewExporter_err_version(t *testing.T) { c := Config{ProtocolVersion: "0.0.0", Encoding: defaultEncoding} texp, err := newTracesExporter(c, exportertest.NewNopCreateSettings(), tracesMarshalers()) + require.NoError(t, err) + err = texp.start(context.Background(), componenttest.NewNopHost()) assert.Error(t, err) - assert.Nil(t, texp) } func TestNewExporter_err_encoding(t *testing.T) { @@ -40,8 +42,9 @@ func TestNewExporter_err_encoding(t *testing.T) { func TestNewMetricsExporter_err_version(t *testing.T) { c := Config{ProtocolVersion: "0.0.0", Encoding: defaultEncoding} mexp, err := newMetricsExporter(c, exportertest.NewNopCreateSettings(), metricsMarshalers()) + require.NoError(t, err) + err = mexp.start(context.Background(), componenttest.NewNopHost()) assert.Error(t, err) - assert.Nil(t, mexp) } func TestNewMetricsExporter_err_encoding(t *testing.T) { @@ -60,9 +63,10 @@ func TestNewMetricsExporter_err_traces_encoding(t *testing.T) { func TestNewLogsExporter_err_version(t *testing.T) { c := Config{ProtocolVersion: "0.0.0", Encoding: defaultEncoding} - mexp, err := newLogsExporter(c, exportertest.NewNopCreateSettings(), logsMarshalers()) + lexp, err := newLogsExporter(c, exportertest.NewNopCreateSettings(), logsMarshalers()) + require.NoError(t, err) + err = lexp.start(context.Background(), componenttest.NewNopHost()) assert.Error(t, err) - assert.Nil(t, mexp) } func TestNewLogsExporter_err_encoding(t *testing.T) { @@ -98,17 +102,20 @@ func TestNewExporter_err_auth_type(t *testing.T) { }, } texp, err := newTracesExporter(c, exportertest.NewNopCreateSettings(), tracesMarshalers()) + require.NoError(t, err) + err = texp.start(context.Background(), componenttest.NewNopHost()) assert.Error(t, err) assert.Contains(t, err.Error(), "failed to load TLS config") - assert.Nil(t, texp) mexp, err := newMetricsExporter(c, exportertest.NewNopCreateSettings(), metricsMarshalers()) + require.NoError(t, err) + err = mexp.start(context.Background(), componenttest.NewNopHost()) assert.Error(t, err) assert.Contains(t, err.Error(), "failed to load TLS config") - assert.Nil(t, mexp) lexp, err := newLogsExporter(c, exportertest.NewNopCreateSettings(), logsMarshalers()) + require.NoError(t, err) + err = lexp.start(context.Background(), componenttest.NewNopHost()) assert.Error(t, err) assert.Contains(t, err.Error(), "failed to load TLS config") - assert.Nil(t, lexp) } @@ -120,9 +127,10 @@ func TestNewExporter_err_compression(t *testing.T) { }, } texp, err := newTracesExporter(c, exportertest.NewNopCreateSettings(), tracesMarshalers()) + require.NoError(t, err) + err = texp.start(context.Background(), componenttest.NewNopHost()) assert.Error(t, err) assert.Contains(t, err.Error(), "producer.compression should be one of 'none', 'gzip', 'snappy', 'lz4', or 'zstd'. configured value idk") - assert.Nil(t, texp) } func TestTracesPusher(t *testing.T) { diff --git a/exporter/kafkaexporter/metadata.yaml b/exporter/kafkaexporter/metadata.yaml index 7f5bad2c9b10..0aa2d33bc065 100644 --- a/exporter/kafkaexporter/metadata.yaml +++ b/exporter/kafkaexporter/metadata.yaml @@ -9,7 +9,6 @@ status: codeowners: active: [pavolloffay, MovieStoreGuy] -# TODO: Update the exporter to pass the tests tests: + config: skip_lifecycle: true - skip_shutdown: true