diff --git a/.chloggen/cwlogs-exporter-fix-shutdown.yaml b/.chloggen/cwlogs-exporter-fix-shutdown.yaml new file mode 100644 index 000000000000..9aab1801cbd1 --- /dev/null +++ b/.chloggen/cwlogs-exporter-fix-shutdown.yaml @@ -0,0 +1,11 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: exporter/awscloudwatch + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Flush logs on the exporter shutdown + +# One or more tracking issues related to the change +issues: [18518] diff --git a/exporter/awscloudwatchlogsexporter/exporter.go b/exporter/awscloudwatchlogsexporter/exporter.go index 83cf41f948e7..c84470c2f7fa 100644 --- a/exporter/awscloudwatchlogsexporter/exporter.go +++ b/exporter/awscloudwatchlogsexporter/exporter.go @@ -45,7 +45,7 @@ type exporter struct { pusher cwlogs.Pusher } -func newCwLogsPusher(expConfig *Config, params exp.CreateSettings) (exp.Logs, error) { +func newCwLogsPusher(expConfig *Config, params exp.CreateSettings) (*exporter, error) { if expConfig == nil { return nil, errors.New("awscloudwatchlogs exporter config is nil") } @@ -81,7 +81,7 @@ func newCwLogsPusher(expConfig *Config, params exp.CreateSettings) (exp.Logs, er func newCwLogsExporter(config component.Config, params exp.CreateSettings) (exp.Logs, error) { expConfig := config.(*Config) - logsExporter, err := newCwLogsPusher(expConfig, params) + logsPusher, err := newCwLogsPusher(expConfig, params) if err != nil { return nil, err } @@ -89,14 +89,15 @@ func newCwLogsExporter(config component.Config, params exp.CreateSettings) (exp. context.TODO(), params, config, - logsExporter.ConsumeLogs, + logsPusher.consumeLogs, exporterhelper.WithQueue(expConfig.enforcedQueueSettings()), exporterhelper.WithRetry(expConfig.RetrySettings), + exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), + exporterhelper.WithShutdown(logsPusher.shutdown), ) - } -func (e *exporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error { +func (e *exporter) consumeLogs(_ context.Context, ld plog.Logs) error { cwLogsPusher := e.pusher logEvents, _ := logsToCWLogs(e.logger, ld) if len(logEvents) == 0 { @@ -123,21 +124,13 @@ func (e *exporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error { return nil } -func (e *exporter) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: false} -} - -func (e *exporter) Shutdown(ctx context.Context) error { +func (e *exporter) shutdown(_ context.Context) error { if e.pusher != nil { e.pusher.ForceFlush() } return nil } -func (e *exporter) Start(ctx context.Context, host component.Host) error { - return nil -} - func logsToCWLogs(logger *zap.Logger, ld plog.Logs) ([]*cloudwatchlogs.InputLogEvent, int) { n := ld.ResourceLogs().Len() if n == 0 { diff --git a/exporter/awscloudwatchlogsexporter/exporter_test.go b/exporter/awscloudwatchlogsexporter/exporter_test.go index a2ee98a0f900..bdd590527a3b 100644 --- a/exporter/awscloudwatchlogsexporter/exporter_test.go +++ b/exporter/awscloudwatchlogsexporter/exporter_test.go @@ -174,9 +174,9 @@ func TestConsumeLogs(t *testing.T) { logPusher := new(mockPusher) logPusher.On("AddLogEntry", nil).Return("").Once() logPusher.On("ForceFlush", nil).Return("").Twice() - exp.(*exporter).pusher = logPusher - require.NoError(t, exp.(*exporter).ConsumeLogs(ctx, ld)) - require.NoError(t, exp.Shutdown(ctx)) + exp.pusher = logPusher + require.NoError(t, exp.consumeLogs(ctx, ld)) + require.NoError(t, exp.shutdown(ctx)) } func TestNewExporterWithoutRegionErr(t *testing.T) {