Skip to content

Commit

Permalink
Revert "Add metrics to Kafka exporter (#1966)" (#2222)
Browse files Browse the repository at this point in the history
  • Loading branch information
tigrannajaryan authored Nov 27, 2020
1 parent 42a7f66 commit e48f95f
Show file tree
Hide file tree
Showing 13 changed files with 84 additions and 349 deletions.
3 changes: 0 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@

## Unreleased

## 💡 Enhancements 💡

- `kafka` exporter: Add support for exporting metrics as otlp Protobuf. #1966
## v0.15.0 Beta

## 🛑 Breaking changes 🛑
Expand Down
11 changes: 5 additions & 6 deletions exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ The following settings are required:

The following settings can be optionally configured:
- `brokers` (default = localhost:9092): The list of kafka brokers
- `topic` (default = otlp_spans for traces, otlp_metrics for metrics): The name of the kafka topic to export to.
- `encoding` (default = otlp_proto): The encoding of the traces sent to kafka. All available encodings:
- `otlp_proto`: payload is Protobuf serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics.
- The following encodings are valid *only* for **traces**.
- `jaeger_proto`: the payload is serialized to a single Jaeger proto `Span`.
- `jaeger_json`: the payload is serialized to a single Jaeger JSON Span using `jsonpb`.
- `topic` (default = otlp_spans): The name of the kafka topic to export to
- `encoding` (default = otlp_proto): The encoding of the payload sent to kafka. Available encodings:
- `otlp_proto`: the payload is serialized to `ExportTraceServiceRequest`.
- `jaeger_proto`: the payload is serialized to a single Jaeger proto `Span`.
- `jaeger_json`: the payload is serialized to a single Jaeger JSON Span using `jsonpb`.
- `auth`
- `plain_text`
- `username`: The username to use.
Expand Down
5 changes: 2 additions & 3 deletions exporter/kafkaexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@ type Config struct {
Brokers []string `mapstructure:"brokers"`
// Kafka protocol version
ProtocolVersion string `mapstructure:"protocol_version"`
// The name of the kafka topic to export to (default otlp_spans for traces, otlp_metrics for metrics)
// The name of the kafka topic to export to (default "otlp_spans")
Topic string `mapstructure:"topic"`

// Encoding of messages (default "otlp_proto")
// Encoding of the messages (default "otlp_proto")
Encoding string `mapstructure:"encoding"`

// Metadata is the namespace for metadata management properties used by the
Expand Down
59 changes: 13 additions & 46 deletions exporter/kafkaexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ import (
)

const (
typeStr = "kafka"
defaultTracesTopic = "otlp_spans"
defaultMetricsTopic = "otlp_metrics"
defaultEncoding = "otlp_proto"
defaultBroker = "localhost:9092"
typeStr = "kafka"
defaultTopic = "otlp_spans"
defaultEncoding = "otlp_proto"
defaultBroker = "localhost:9092"
// default from sarama.NewConfig()
defaultMetadataRetryMax = 3
// default from sarama.NewConfig()
Expand All @@ -40,29 +39,27 @@ const (
// FactoryOption applies changes to kafkaExporterFactory.
type FactoryOption func(factory *kafkaExporterFactory)

// WithAddTracesMarshallers adds tracesMarshallers.
func WithAddTracesMarshallers(encodingMarshaller map[string]TracesMarshaller) FactoryOption {
// WithAddMarshallers adds marshallers.
func WithAddMarshallers(encodingMarshaller map[string]Marshaller) FactoryOption {
return func(factory *kafkaExporterFactory) {
for encoding, marshaller := range encodingMarshaller {
factory.tracesMarshallers[encoding] = marshaller
factory.marshallers[encoding] = marshaller
}
}
}

// NewFactory creates Kafka exporter factory.
func NewFactory(options ...FactoryOption) component.ExporterFactory {
f := &kafkaExporterFactory{
tracesMarshallers: tracesMarshallers(),
metricsMarshallers: metricsMarshallers(),
marshallers: defaultMarshallers(),
}
for _, o := range options {
o(f)
}
return exporterhelper.NewFactory(
typeStr,
createDefaultConfig,
exporterhelper.WithTraces(f.createTraceExporter),
exporterhelper.WithMetrics(f.createMetricsExporter))
exporterhelper.WithTraces(f.createTraceExporter))
}

func createDefaultConfig() configmodels.Exporter {
Expand All @@ -75,9 +72,8 @@ func createDefaultConfig() configmodels.Exporter {
RetrySettings: exporterhelper.CreateDefaultRetrySettings(),
QueueSettings: exporterhelper.CreateDefaultQueueSettings(),
Brokers: []string{defaultBroker},
// using an empty topic to track when it has not been set by user, default is based on traces or metrics.
Topic: "",
Encoding: defaultEncoding,
Topic: defaultTopic,
Encoding: defaultEncoding,
Metadata: Metadata{
Full: defaultMetadataFull,
Retry: MetadataRetry{
Expand All @@ -89,8 +85,7 @@ func createDefaultConfig() configmodels.Exporter {
}

type kafkaExporterFactory struct {
tracesMarshallers map[string]TracesMarshaller
metricsMarshallers map[string]MetricsMarshaller
marshallers map[string]Marshaller
}

func (f *kafkaExporterFactory) createTraceExporter(
Expand All @@ -99,10 +94,7 @@ func (f *kafkaExporterFactory) createTraceExporter(
cfg configmodels.Exporter,
) (component.TracesExporter, error) {
oCfg := cfg.(*Config)
if oCfg.Topic == "" {
oCfg.Topic = defaultTracesTopic
}
exp, err := newTracesExporter(*oCfg, params, f.tracesMarshallers)
exp, err := newExporter(*oCfg, params, f.marshallers)
if err != nil {
return nil, err
}
Expand All @@ -117,28 +109,3 @@ func (f *kafkaExporterFactory) createTraceExporter(
exporterhelper.WithQueue(oCfg.QueueSettings),
exporterhelper.WithShutdown(exp.Close))
}

func (f *kafkaExporterFactory) createMetricsExporter(
_ context.Context,
params component.ExporterCreateParams,
cfg configmodels.Exporter,
) (component.MetricsExporter, error) {
oCfg := cfg.(*Config)
if oCfg.Topic == "" {
oCfg.Topic = defaultMetricsTopic
}
exp, err := newMetricsExporter(*oCfg, params, f.metricsMarshallers)
if err != nil {
return nil, err
}
return exporterhelper.NewMetricsExporter(
cfg,
params.Logger,
exp.metricsDataPusher,
// Disable exporterhelper Timeout, because we cannot pass a Context to the Producer,
// and will rely on the sarama Producer Timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(oCfg.RetrySettings),
exporterhelper.WithQueue(oCfg.QueueSettings),
exporterhelper.WithShutdown(exp.Close))
}
36 changes: 7 additions & 29 deletions exporter/kafkaexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestCreateDefaultConfig(t *testing.T) {
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, configcheck.ValidateConfig(cfg))
assert.Equal(t, []string{defaultBroker}, cfg.Brokers)
assert.Equal(t, "", cfg.Topic)
assert.Equal(t, defaultTopic, cfg.Topic)
}

func TestCreateTracesExporter(t *testing.T) {
Expand All @@ -41,48 +41,26 @@ func TestCreateTracesExporter(t *testing.T) {
cfg.ProtocolVersion = "2.0.0"
// this disables contacting the broker so we can successfully create the exporter
cfg.Metadata.Full = false
f := kafkaExporterFactory{tracesMarshallers: tracesMarshallers()}
f := kafkaExporterFactory{marshallers: defaultMarshallers()}
r, err := f.createTraceExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
require.NoError(t, err)
assert.NotNil(t, r)
}

func TestCreateMetricsExport(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
// this disables contacting the broker so we can successfully create the exporter
cfg.Metadata.Full = false
mf := kafkaExporterFactory{metricsMarshallers: metricsMarshallers()}
mr, err := mf.createMetricsExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
require.NoError(t, err)
assert.NotNil(t, mr)
}

func TestCreateTracesExporter_err(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
f := kafkaExporterFactory{tracesMarshallers: tracesMarshallers()}
f := kafkaExporterFactory{marshallers: defaultMarshallers()}
r, err := f.createTraceExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
// no available broker
require.Error(t, err)
assert.Nil(t, r)
}

func TestCreateMetricsExporter_err(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
mf := kafkaExporterFactory{metricsMarshallers: metricsMarshallers()}
mr, err := mf.createMetricsExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
require.Error(t, err)
assert.Nil(t, mr)
}

func TestWithMarshallers(t *testing.T) {
cm := &customMarshaller{}
f := NewFactory(WithAddTracesMarshallers(map[string]TracesMarshaller{cm.Encoding(): cm}))
f := NewFactory(WithAddMarshallers(map[string]Marshaller{cm.Encoding(): cm}))
cfg := createDefaultConfig().(*Config)
// disable contacting broker
cfg.Metadata.Full = false
Expand All @@ -94,7 +72,7 @@ func TestWithMarshallers(t *testing.T) {
require.NotNil(t, exporter)
})
t.Run("default_encoding", func(t *testing.T) {
cfg.Encoding = new(otlpTracesPbMarshaller).Encoding()
cfg.Encoding = new(otlpProtoMarshaller).Encoding()
exporter, err := f.CreateTracesExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
require.NoError(t, err)
assert.NotNil(t, exporter)
Expand All @@ -104,9 +82,9 @@ func TestWithMarshallers(t *testing.T) {
type customMarshaller struct {
}

var _ TracesMarshaller = (*customMarshaller)(nil)
var _ Marshaller = (*customMarshaller)(nil)

func (c customMarshaller) Marshal(_ pdata.Traces) ([]Message, error) {
func (c customMarshaller) Marshal(traces pdata.Traces) ([]Message, error) {
panic("implement me")
}

Expand Down
2 changes: 1 addition & 1 deletion exporter/kafkaexporter/jaeger_marshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type jaegerMarshaller struct {
marshaller jaegerSpanMarshaller
}

var _ TracesMarshaller = (*jaegerMarshaller)(nil)
var _ Marshaller = (*jaegerMarshaller)(nil)

func (j jaegerMarshaller) Marshal(traces pdata.Traces) ([]Message, error) {
batches, err := jaegertranslator.InternalTracesToJaegerProto(traces)
Expand Down
2 changes: 1 addition & 1 deletion exporter/kafkaexporter/jaeger_marshaller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestJaegerMarshaller(t *testing.T) {
require.NoError(t, jsonMarshaller.Marshal(jsonByteBuffer, batches[0].Spans[0]))

tests := []struct {
unmarshaller TracesMarshaller
unmarshaller Marshaller
encoding string
messages []Message
}{
Expand Down
90 changes: 20 additions & 70 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,55 +28,21 @@ import (

var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding")

// kafkaTracesProducer uses sarama to produce trace messages to Kafka.
type kafkaTracesProducer struct {
// kafkaProducer uses sarama to produce messages to Kafka.
type kafkaProducer struct {
producer sarama.SyncProducer
topic string
marshaller TracesMarshaller
marshaller Marshaller
logger *zap.Logger
}

func (e *kafkaTracesProducer) traceDataPusher(_ context.Context, td pdata.Traces) (int, error) {
messages, err := e.marshaller.Marshal(td)
if err != nil {
return td.SpanCount(), consumererror.Permanent(err)
}
err = e.producer.SendMessages(producerMessages(messages, e.topic))
if err != nil {
return td.SpanCount(), err
}
return 0, nil
}

func (e *kafkaTracesProducer) Close(context.Context) error {
return e.producer.Close()
}

// kafkaMetricsProducer uses sarama to produce metrics messages to kafka
type kafkaMetricsProducer struct {
producer sarama.SyncProducer
topic string
marshaller MetricsMarshaller
logger *zap.Logger
}

func (e *kafkaMetricsProducer) metricsDataPusher(_ context.Context, md pdata.Metrics) (int, error) {
messages, err := e.marshaller.Marshal(md)
if err != nil {
return md.MetricCount(), consumererror.Permanent(err)
}
err = e.producer.SendMessages(producerMessages(messages, e.topic))
if err != nil {
return md.MetricCount(), err
// newExporter creates Kafka exporter.
func newExporter(config Config, params component.ExporterCreateParams, marshallers map[string]Marshaller) (*kafkaProducer, error) {
marshaller := marshallers[config.Encoding]
if marshaller == nil {
return nil, errUnrecognizedEncoding
}
return 0, nil
}

func (e *kafkaMetricsProducer) Close(context.Context) error {
return e.producer.Close()
}

func newSaramaProducer(config Config) (sarama.SyncProducer, error) {
c := sarama.NewConfig()
// These setting are required by the sarama.SyncProducer implementation.
c.Producer.Return.Successes = true
Expand All @@ -102,44 +68,28 @@ func newSaramaProducer(config Config) (sarama.SyncProducer, error) {
if err != nil {
return nil, err
}
return producer, nil
}

func newMetricsExporter(config Config, params component.ExporterCreateParams, marshallers map[string]MetricsMarshaller) (*kafkaMetricsProducer, error) {
marshaller := marshallers[config.Encoding]
if marshaller == nil {
return nil, errUnrecognizedEncoding
}
producer, err := newSaramaProducer(config)
if err != nil {
return nil, err
}

return &kafkaMetricsProducer{
return &kafkaProducer{
producer: producer,
topic: config.Topic,
marshaller: marshaller,
logger: params.Logger,
}, nil

}

// newTracesExporter creates Kafka exporter.
func newTracesExporter(config Config, params component.ExporterCreateParams, marshallers map[string]TracesMarshaller) (*kafkaTracesProducer, error) {
marshaller := marshallers[config.Encoding]
if marshaller == nil {
return nil, errUnrecognizedEncoding
func (e *kafkaProducer) traceDataPusher(_ context.Context, td pdata.Traces) (int, error) {
messages, err := e.marshaller.Marshal(td)
if err != nil {
return td.SpanCount(), consumererror.Permanent(err)
}
producer, err := newSaramaProducer(config)
err = e.producer.SendMessages(producerMessages(messages, e.topic))
if err != nil {
return nil, err
return td.SpanCount(), err
}
return &kafkaTracesProducer{
producer: producer,
topic: config.Topic,
marshaller: marshaller,
logger: params.Logger,
}, nil
return 0, nil
}

func (e *kafkaProducer) Close(context.Context) error {
return e.producer.Close()
}

func producerMessages(messages []Message, topic string) []*sarama.ProducerMessage {
Expand Down
Loading

0 comments on commit e48f95f

Please sign in to comment.