Skip to content

Commit

Permalink
enable awscloudwatchlogsexporter and use cwlogs package to export logs (
Browse files Browse the repository at this point in the history
#7152)

* enable awscloudwatchlogsexporter and use cwlogs package to export logs

* run gofmt

* fix import order

* remove empty string default val for loggroup and stream

* remove log

* remove seq token, save single pusher instead of map of pushers

* fix shutdown func

* remove groupToStream map

* remove duplicate region config var since its already in aws session util

* add default test cases

* fix messaging and test case

* format config file

* fix spacing

* remove getLogsPusher function

* remove awscloudwatchlogs component from gomod
  • Loading branch information
aateeqi authored Jan 20, 2022
1 parent 28adda5 commit 5440d66
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 105 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- `datadogexporter`: Add http.status_code tag to trace stats (#6889)
- `mongodbreceiver`: Add initial client code to the component (#7125)
- `tanzuobservabilityexporter`: Support delta histograms (#6897)
- `awscloudwatchlogsexporter`: Use cwlogs package to export logs (#7152)
- `mysqlreceiver`: Add the receiver to available components (#7078)
- `tanzuobservabilityexporter`: Documentation for the memory_limiter configuration (#7164)
- `dynatraceexporter`: Do not shut down exporter when metrics ingest module is temporarily unavailable (#7161)
Expand Down
11 changes: 7 additions & 4 deletions exporter/awscloudwatchlogsexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil"
)

// Config represent a configuration for the CloudWatch logs exporter.
Expand All @@ -35,10 +38,6 @@ type Config struct {
// that share the same source.
LogStreamName string `mapstructure:"log_stream_name"`

// Region is the AWS region where the logs are sent to.
// Optional.
Region string `mapstructure:"region"`

// Endpoint is the CloudWatch Logs service endpoint which the requests
// are forwarded to. https://docs.aws.amazon.com/general/latest/gr/cwl_region.html
// e.g. logs.us-east-1.amazonaws.com
Expand All @@ -48,6 +47,10 @@ type Config struct {
// QueueSettings is a subset of exporterhelper.QueueSettings,
// because only QueueSize is user-settable due to how AWS CloudWatch API works
QueueSettings QueueSettings `mapstructure:"sending_queue"`

logger *zap.Logger

awsutil.AWSSessionSettings `mapstructure:",squash"`
}

type QueueSettings struct {
Expand Down
19 changes: 11 additions & 8 deletions exporter/awscloudwatchlogsexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/service/servicetest"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil"
)

func TestLoadConfig(t *testing.T) {
Expand All @@ -45,12 +47,12 @@ func TestLoadConfig(t *testing.T) {

assert.Equal(t,
&Config{
ExporterSettings: config.NewExporterSettings(config.NewComponentIDWithName(typeStr, "e1-defaults")),
RetrySettings: defaultRetrySettings,
LogGroupName: "test-1",
LogStreamName: "testing",
Region: "",
Endpoint: "",
ExporterSettings: config.NewExporterSettings(config.NewComponentIDWithName(typeStr, "e1-defaults")),
RetrySettings: defaultRetrySettings,
LogGroupName: "test-1",
LogStreamName: "testing",
Endpoint: "",
AWSSessionSettings: awsutil.CreateDefaultSessionConfig(),
QueueSettings: QueueSettings{
QueueSize: exporterhelper.DefaultQueueSettings().QueueSize,
},
Expand All @@ -69,8 +71,9 @@ func TestLoadConfig(t *testing.T) {
MaxInterval: defaultRetrySettings.MaxInterval,
MaxElapsedTime: defaultRetrySettings.MaxElapsedTime,
},
LogGroupName: "test-2",
LogStreamName: "testing",
AWSSessionSettings: awsutil.CreateDefaultSessionConfig(),
LogGroupName: "test-2",
LogStreamName: "testing",
QueueSettings: QueueSettings{
QueueSize: 2,
},
Expand Down
162 changes: 88 additions & 74 deletions exporter/awscloudwatchlogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,108 +18,122 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/google/uuid"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/model/pdata"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs"
)

type exporter struct {
config *Config
logger *zap.Logger
Config *Config
logger *zap.Logger
retryCount int
collectorID string
svcStructuredLog *cwlogs.Client
pusher cwlogs.Pusher
}

startOnce sync.Once
client *cloudwatchlogs.CloudWatchLogs // available after startOnce
func newCwLogsPusher(expConfig *Config, params component.ExporterCreateSettings) (component.LogsExporter, error) {
if expConfig == nil {
return nil, errors.New("awscloudwatchlogs exporter config is nil")
}

seqTokenMu sync.Mutex
seqToken string
}
expConfig.logger = params.Logger

func (e *exporter) Start(ctx context.Context, host component.Host) error {
var startErr error
e.startOnce.Do(func() {
awsConfig := &aws.Config{}
if e.config.Region != "" {
awsConfig.Region = aws.String(e.config.Region)
}
if e.config.Endpoint != "" {
awsConfig.Endpoint = aws.String(e.config.Endpoint)
}
awsConfig.MaxRetries = aws.Int(1) // retry will be handled by the collector queue
sess, err := session.NewSession(awsConfig)
if err != nil {
startErr = err
return
}
e.client = cloudwatchlogs.New(sess)
// create AWS session
awsConfig, session, err := awsutil.GetAWSConfigSession(params.Logger, &awsutil.Conn{}, &expConfig.AWSSessionSettings)
if err != nil {
return nil, err
}

e.logger.Debug("Retrieving CloudWatch sequence token")
out, err := e.client.DescribeLogStreams(&cloudwatchlogs.DescribeLogStreamsInput{
LogGroupName: aws.String(e.config.LogGroupName),
LogStreamNamePrefix: aws.String(e.config.LogStreamName),
})
if err != nil {
startErr = err
return
}
if len(out.LogStreams) == 0 {
startErr = errors.New("cannot find log group and stream")
return
}
stream := out.LogStreams[0]
if stream.UploadSequenceToken == nil {
e.logger.Debug("CloudWatch sequence token is nil, will assume empty")
return
}
e.seqToken = *stream.UploadSequenceToken
})
return startErr
}
// create CWLogs client with aws session config
svcStructuredLog := cwlogs.NewClient(params.Logger, awsConfig, params.BuildInfo, expConfig.LogGroupName, session)
collectorIdentifier, err := uuid.NewRandom()

func (e *exporter) Shutdown(ctx context.Context) error {
// TODO(jbd): Signal shutdown to flush the logs.
return nil
if err != nil {
return nil, err
}

expConfig.Validate()

pusher := cwlogs.NewPusher(aws.String(expConfig.LogGroupName), aws.String(expConfig.LogStreamName), *awsConfig.MaxRetries, *svcStructuredLog, params.Logger)

logsExporter := &exporter{
svcStructuredLog: svcStructuredLog,
Config: expConfig,
logger: params.Logger,
retryCount: *awsConfig.MaxRetries,
collectorID: collectorIdentifier.String(),
pusher: pusher,
}
return logsExporter, nil
}

func (e *exporter) PushLogs(ctx context.Context, ld pdata.Logs) (err error) {
// TODO(jbd): Relax this once CW Logs support ingest
// without sequence tokens.
e.seqTokenMu.Lock()
defer e.seqTokenMu.Unlock()
func newCwLogsExporter(config config.Exporter, params component.ExporterCreateSettings) (component.LogsExporter, error) {
expConfig := config.(*Config)
logsExporter, err := newCwLogsPusher(expConfig, params)
if err != nil {
return nil, err
}
return exporterhelper.NewLogsExporter(
config,
params,
logsExporter.ConsumeLogs,
exporterhelper.WithQueue(expConfig.enforcedQueueSettings()),
exporterhelper.WithRetry(expConfig.RetrySettings),
)

}

func (e *exporter) ConsumeLogs(ctx context.Context, ld pdata.Logs) error {
cwLogsPusher := e.pusher
logEvents, _ := logsToCWLogs(e.logger, ld)
if len(logEvents) == 0 {
return nil
}

e.logger.Debug("Putting log events", zap.Int("num_of_events", len(logEvents)))
input := &cloudwatchlogs.PutLogEventsInput{
LogGroupName: aws.String(e.config.LogGroupName),
LogStreamName: aws.String(e.config.LogStreamName),
LogEvents: logEvents,
for _, logEvent := range logEvents {
logEvent := &cwlogs.Event{
InputLogEvent: logEvent,
GeneratedTime: time.Now(),
}
e.logger.Debug("Adding log event", zap.Any("event", logEvent))
err := cwLogsPusher.AddLogEntry(logEvent)
if err != nil {
e.logger.Error("Failed ", zap.Int("num_of_events", len(logEvents)))
}
}
if e.seqToken != "" {
input.SequenceToken = aws.String(e.seqToken)
} else {
e.logger.Debug("Putting log events without a sequence token")
e.logger.Debug("Log events are successfully put")
flushErr := cwLogsPusher.ForceFlush()
if flushErr != nil {
e.logger.Error("Error force flushing logs. Skipping to next logPusher.", zap.Error(flushErr))
return flushErr
}
return nil
}

out, err := e.client.PutLogEvents(input)
if err != nil {
return err
}
if info := out.RejectedLogEventsInfo; info != nil {
return fmt.Errorf("log event rejected: %s", info.String())
func (e *exporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (e *exporter) Shutdown(ctx context.Context) error {
if e.pusher != nil {
e.pusher.ForceFlush()
}
e.logger.Debug("Log events are successfully put")
return nil
}

e.seqToken = *out.NextSequenceToken
func (e *exporter) Start(ctx context.Context, host component.Host) error {
return nil
}

Expand Down
66 changes: 66 additions & 0 deletions exporter/awscloudwatchlogsexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,43 @@
package awscloudwatchlogsexporter

import (
"context"
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/model/pdata"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs"
)

type mockPusher struct {
mock.Mock
}

func (p *mockPusher) AddLogEntry(logEvent *cwlogs.Event) error {
args := p.Called(nil)
errorStr := args.String(0)
if errorStr != "" {
return awserr.NewRequestFailure(nil, 400, "").(error)
}
return nil
}

func (p *mockPusher) ForceFlush() error {
args := p.Called(nil)
errorStr := args.String(0)
if errorStr != "" {
return awserr.NewRequestFailure(nil, 400, "").(error)
}
return nil
}

func TestLogToCWLog(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -200,3 +229,40 @@ func TestAttrValue(t *testing.T) {
})
}
}

func TestConsumeLogs(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
factory := NewFactory()
expCfg := factory.CreateDefaultConfig().(*Config)
expCfg.Region = "us-west-2"
expCfg.LogGroupName = "testGroup"
expCfg.LogStreamName = "testStream"
expCfg.MaxRetries = 0
exp, err := newCwLogsPusher(expCfg, componenttest.NewNopExporterCreateSettings())
assert.Nil(t, err)
assert.NotNil(t, exp)
ld := pdata.NewLogs()
r := ld.ResourceLogs().AppendEmpty()
r.Resource().Attributes().UpsertString("hello", "test")
logRecords := r.InstrumentationLibraryLogs().AppendEmpty().Logs()
logRecords.EnsureCapacity(5)
logRecords.AppendEmpty().SetName("test")
assert.Equal(t, 1, ld.LogRecordCount())

logPusher := new(mockPusher)
logPusher.On("AddLogEntry", nil).Return("").Once()
logPusher.On("ForceFlush", nil).Return("").Twice()
exp.(*exporter).pusher = logPusher
require.NoError(t, exp.(*exporter).ConsumeLogs(ctx, ld))
require.NoError(t, exp.Shutdown(ctx))
}

func TestNewExporterWithoutRegionErr(t *testing.T) {
factory := NewFactory()
expCfg := factory.CreateDefaultConfig().(*Config)
expCfg.MaxRetries = 0
exp, err := newCwLogsExporter(expCfg, componenttest.NewNopExporterCreateSettings())
assert.Nil(t, exp)
assert.NotNil(t, err)
}
Loading

0 comments on commit 5440d66

Please sign in to comment.