Skip to content

Commit

Permalink
[exporter/awskinesis] Restore Kinesis stream validation on the start (#…
Browse files Browse the repository at this point in the history
…18522)

The exporter implements the exporter interfaces but it's not being used, `exporterhelper.NewLogsExporter` is applied instead. This change removes the implementation and moves the Start method to `NewLogsExporter` that is currently ignored
  • Loading branch information
dmitryax authored Feb 13, 2023
1 parent 42d7bc8 commit 57af8a9
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 29 deletions.
11 changes: 11 additions & 0 deletions .chloggen/kinesis-exporter-fix-start.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'bug_fix'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exporter/awskinesis

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Restore Kinesis stream validation on the start

# One or more tracking issues related to the change
issues: [18522]
31 changes: 5 additions & 26 deletions exporter/awskinesisexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/aws/aws-sdk-go-v2/service/sts"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand All @@ -48,12 +46,6 @@ type options struct {
NewKinesisClient func(conf aws.Config, opts ...func(*kinesis.Options)) *kinesis.Client
}

var (
_ exporter.Traces = (*Exporter)(nil)
_ exporter.Metrics = (*Exporter)(nil)
_ exporter.Logs = (*Exporter)(nil)
)

func createExporter(ctx context.Context, c component.Config, log *zap.Logger, opts ...func(opt *options)) (*Exporter, error) {
options := &options{
NewKinesisClient: kinesis.NewFromConfig,
Expand Down Expand Up @@ -134,42 +126,29 @@ func createExporter(ctx context.Context, c component.Config, log *zap.Logger, op
}, nil
}

// Start tells the exporter to start. The exporter may prepare for exporting
// by connecting to the endpoint. Host parameter can be used for communicating
// with the host after Start() has already returned. If error is returned by
// Start() then the collector startup will be aborted.
func (e Exporter) Start(ctx context.Context, _ component.Host) error {
// start validates that the Kinesis stream is available.
func (e Exporter) start(ctx context.Context, _ component.Host) error {
return e.producer.Ready(ctx)
}

// Capabilities implements the consumer interface.
func (e Exporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

// Shutdown is invoked during exporter shutdown.
func (e Exporter) Shutdown(context.Context) error {
return nil
}

// ConsumeTraces receives a span batch and exports it to AWS Kinesis
func (e Exporter) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
func (e Exporter) consumeTraces(ctx context.Context, td ptrace.Traces) error {
bt, err := e.batcher.Traces(td)
if err != nil {
return err
}
return e.producer.Put(ctx, bt)
}

func (e Exporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
func (e Exporter) consumeMetrics(ctx context.Context, md pmetric.Metrics) error {
bt, err := e.batcher.Metrics(md)
if err != nil {
return err
}
return e.producer.Put(ctx, bt)
}

func (e Exporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
func (e Exporter) consumeLogs(ctx context.Context, ld plog.Logs) error {
bt, err := e.batcher.Logs(ld)
if err != nil {
return err
Expand Down
9 changes: 6 additions & 3 deletions exporter/awskinesisexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ func NewTracesExporter(ctx context.Context, params exporter.CreateSettings, conf
ctx,
params,
conf,
exp.ConsumeTraces,
exp.consumeTraces,
exporterhelper.WithStart(exp.start),
exporterhelper.WithTimeout(c.TimeoutSettings),
exporterhelper.WithRetry(c.RetrySettings),
exporterhelper.WithQueue(c.QueueSettings),
Expand All @@ -89,7 +90,8 @@ func NewMetricsExporter(ctx context.Context, params exporter.CreateSettings, con
ctx,
params,
c,
exp.ConsumeMetrics,
exp.consumeMetrics,
exporterhelper.WithStart(exp.start),
exporterhelper.WithTimeout(c.TimeoutSettings),
exporterhelper.WithRetry(c.RetrySettings),
exporterhelper.WithQueue(c.QueueSettings),
Expand All @@ -106,7 +108,8 @@ func NewLogsExporter(ctx context.Context, params exporter.CreateSettings, conf c
ctx,
params,
c,
exp.ConsumeLogs,
exp.consumeLogs,
exporterhelper.WithStart(exp.start),
exporterhelper.WithTimeout(c.TimeoutSettings),
exporterhelper.WithRetry(c.RetrySettings),
exporterhelper.WithQueue(c.QueueSettings),
Expand Down

0 comments on commit 57af8a9

Please sign in to comment.