Skip to content

Commit

Permalink
[exporter/awscloudwatchlogs] Flush logs on the exporter shutdown (#18518
Browse files Browse the repository at this point in the history
)

The exporter implements the `exporter.Traces` interface but the implementation is not being used, `exporterhelper.NewLogsExporter` is applied instead. This change removes the implementation and moves the methods to `NewLogsExporter` options. This enables the Shutdown method which is currently ignored
  • Loading branch information
dmitryax authored Feb 13, 2023
1 parent feb6c08 commit 6c0169a
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 17 deletions.
11 changes: 11 additions & 0 deletions .chloggen/cwlogs-exporter-fix-shutdown.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exporter/awscloudwatch

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Flush logs on the exporter shutdown

# One or more tracking issues related to the change
issues: [18518]
21 changes: 7 additions & 14 deletions exporter/awscloudwatchlogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type exporter struct {
pusher cwlogs.Pusher
}

func newCwLogsPusher(expConfig *Config, params exp.CreateSettings) (exp.Logs, error) {
func newCwLogsPusher(expConfig *Config, params exp.CreateSettings) (*exporter, error) {
if expConfig == nil {
return nil, errors.New("awscloudwatchlogs exporter config is nil")
}
Expand Down Expand Up @@ -81,22 +81,23 @@ func newCwLogsPusher(expConfig *Config, params exp.CreateSettings) (exp.Logs, er

func newCwLogsExporter(config component.Config, params exp.CreateSettings) (exp.Logs, error) {
expConfig := config.(*Config)
logsExporter, err := newCwLogsPusher(expConfig, params)
logsPusher, err := newCwLogsPusher(expConfig, params)
if err != nil {
return nil, err
}
return exporterhelper.NewLogsExporter(
context.TODO(),
params,
config,
logsExporter.ConsumeLogs,
logsPusher.consumeLogs,
exporterhelper.WithQueue(expConfig.enforcedQueueSettings()),
exporterhelper.WithRetry(expConfig.RetrySettings),
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithShutdown(logsPusher.shutdown),
)

}

func (e *exporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
func (e *exporter) consumeLogs(_ context.Context, ld plog.Logs) error {
cwLogsPusher := e.pusher
logEvents, _ := logsToCWLogs(e.logger, ld)
if len(logEvents) == 0 {
Expand All @@ -123,21 +124,13 @@ func (e *exporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
return nil
}

func (e *exporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (e *exporter) Shutdown(ctx context.Context) error {
func (e *exporter) shutdown(_ context.Context) error {
if e.pusher != nil {
e.pusher.ForceFlush()
}
return nil
}

func (e *exporter) Start(ctx context.Context, host component.Host) error {
return nil
}

func logsToCWLogs(logger *zap.Logger, ld plog.Logs) ([]*cloudwatchlogs.InputLogEvent, int) {
n := ld.ResourceLogs().Len()
if n == 0 {
Expand Down
6 changes: 3 additions & 3 deletions exporter/awscloudwatchlogsexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ func TestConsumeLogs(t *testing.T) {
logPusher := new(mockPusher)
logPusher.On("AddLogEntry", nil).Return("").Once()
logPusher.On("ForceFlush", nil).Return("").Twice()
exp.(*exporter).pusher = logPusher
require.NoError(t, exp.(*exporter).ConsumeLogs(ctx, ld))
require.NoError(t, exp.Shutdown(ctx))
exp.pusher = logPusher
require.NoError(t, exp.consumeLogs(ctx, ld))
require.NoError(t, exp.shutdown(ctx))
}

func TestNewExporterWithoutRegionErr(t *testing.T) {
Expand Down

0 comments on commit 6c0169a

Please sign in to comment.