Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable awscloudwatchlogsexporter and use cwlogs package to export logs #7152

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- `datadogexporter`: Add http.status_code tag to trace stats (#6889)
- `mongodbreceiver`: Add initial client code to the component (#7125)
- `tanzuobservabilityexporter`: Support delta histograms (#6897)
- `awscloudwatchlogsexporter`: Use cwlogs package to export logs (#7152)
- `mysqlreceiver`: Add the receiver to available components (#7078)
- `tanzuobservabilityexporter`: Documentation for the memory_limiter configuration (#7164)
- `dynatraceexporter`: Do not shut down exporter when metrics ingest module is temporarily unavailable (#7161)
Expand Down
11 changes: 7 additions & 4 deletions exporter/awscloudwatchlogsexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil"
)

// Config represent a configuration for the CloudWatch logs exporter.
Expand All @@ -35,10 +38,6 @@ type Config struct {
// that share the same source.
LogStreamName string `mapstructure:"log_stream_name"`

// Region is the AWS region where the logs are sent to.
// Optional.
Region string `mapstructure:"region"`

// Endpoint is the CloudWatch Logs service endpoint which the requests
// are forwarded to. https://docs.aws.amazon.com/general/latest/gr/cwl_region.html
// e.g. logs.us-east-1.amazonaws.com
Expand All @@ -48,6 +47,10 @@ type Config struct {
// QueueSettings is a subset of exporterhelper.QueueSettings,
// because only QueueSize is user-settable due to how AWS CloudWatch API works
QueueSettings QueueSettings `mapstructure:"sending_queue"`

logger *zap.Logger

awsutil.AWSSessionSettings `mapstructure:",squash"`
}

type QueueSettings struct {
Expand Down
19 changes: 11 additions & 8 deletions exporter/awscloudwatchlogsexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/service/servicetest"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil"
)

func TestLoadConfig(t *testing.T) {
Expand All @@ -45,12 +47,12 @@ func TestLoadConfig(t *testing.T) {

assert.Equal(t,
&Config{
ExporterSettings: config.NewExporterSettings(config.NewComponentIDWithName(typeStr, "e1-defaults")),
RetrySettings: defaultRetrySettings,
LogGroupName: "test-1",
LogStreamName: "testing",
Region: "",
Endpoint: "",
ExporterSettings: config.NewExporterSettings(config.NewComponentIDWithName(typeStr, "e1-defaults")),
RetrySettings: defaultRetrySettings,
LogGroupName: "test-1",
LogStreamName: "testing",
Endpoint: "",
AWSSessionSettings: awsutil.CreateDefaultSessionConfig(),
QueueSettings: QueueSettings{
QueueSize: exporterhelper.DefaultQueueSettings().QueueSize,
},
Expand All @@ -69,8 +71,9 @@ func TestLoadConfig(t *testing.T) {
MaxInterval: defaultRetrySettings.MaxInterval,
MaxElapsedTime: defaultRetrySettings.MaxElapsedTime,
},
LogGroupName: "test-2",
LogStreamName: "testing",
AWSSessionSettings: awsutil.CreateDefaultSessionConfig(),
LogGroupName: "test-2",
LogStreamName: "testing",
QueueSettings: QueueSettings{
QueueSize: 2,
},
Expand Down
162 changes: 88 additions & 74 deletions exporter/awscloudwatchlogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,108 +18,122 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/google/uuid"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/model/pdata"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs"
)

type exporter struct {
config *Config
logger *zap.Logger
Config *Config
logger *zap.Logger
retryCount int
collectorID string
svcStructuredLog *cwlogs.Client
pusher cwlogs.Pusher
}

startOnce sync.Once
client *cloudwatchlogs.CloudWatchLogs // available after startOnce
func newCwLogsPusher(expConfig *Config, params component.ExporterCreateSettings) (component.LogsExporter, error) {
if expConfig == nil {
return nil, errors.New("awscloudwatchlogs exporter config is nil")
}

seqTokenMu sync.Mutex
seqToken string
}
expConfig.logger = params.Logger

func (e *exporter) Start(ctx context.Context, host component.Host) error {
var startErr error
e.startOnce.Do(func() {
awsConfig := &aws.Config{}
if e.config.Region != "" {
awsConfig.Region = aws.String(e.config.Region)
}
if e.config.Endpoint != "" {
awsConfig.Endpoint = aws.String(e.config.Endpoint)
}
awsConfig.MaxRetries = aws.Int(1) // retry will be handled by the collector queue
sess, err := session.NewSession(awsConfig)
if err != nil {
startErr = err
return
}
e.client = cloudwatchlogs.New(sess)
// create AWS session
awsConfig, session, err := awsutil.GetAWSConfigSession(params.Logger, &awsutil.Conn{}, &expConfig.AWSSessionSettings)
if err != nil {
return nil, err
}

e.logger.Debug("Retrieving CloudWatch sequence token")
out, err := e.client.DescribeLogStreams(&cloudwatchlogs.DescribeLogStreamsInput{
LogGroupName: aws.String(e.config.LogGroupName),
LogStreamNamePrefix: aws.String(e.config.LogStreamName),
})
if err != nil {
startErr = err
return
}
if len(out.LogStreams) == 0 {
startErr = errors.New("cannot find log group and stream")
return
}
stream := out.LogStreams[0]
if stream.UploadSequenceToken == nil {
e.logger.Debug("CloudWatch sequence token is nil, will assume empty")
return
}
e.seqToken = *stream.UploadSequenceToken
})
return startErr
}
// create CWLogs client with aws session config
svcStructuredLog := cwlogs.NewClient(params.Logger, awsConfig, params.BuildInfo, expConfig.LogGroupName, session)
collectorIdentifier, err := uuid.NewRandom()

func (e *exporter) Shutdown(ctx context.Context) error {
// TODO(jbd): Signal shutdown to flush the logs.
return nil
if err != nil {
return nil, err
}

expConfig.Validate()

pusher := cwlogs.NewPusher(aws.String(expConfig.LogGroupName), aws.String(expConfig.LogStreamName), *awsConfig.MaxRetries, *svcStructuredLog, params.Logger)

logsExporter := &exporter{
svcStructuredLog: svcStructuredLog,
Config: expConfig,
logger: params.Logger,
retryCount: *awsConfig.MaxRetries,
collectorID: collectorIdentifier.String(),
pusher: pusher,
}
return logsExporter, nil
}

func (e *exporter) PushLogs(ctx context.Context, ld pdata.Logs) (err error) {
// TODO(jbd): Relax this once CW Logs support ingest
// without sequence tokens.
e.seqTokenMu.Lock()
defer e.seqTokenMu.Unlock()
func newCwLogsExporter(config config.Exporter, params component.ExporterCreateSettings) (component.LogsExporter, error) {
expConfig := config.(*Config)
logsExporter, err := newCwLogsPusher(expConfig, params)
if err != nil {
return nil, err
}
return exporterhelper.NewLogsExporter(
config,
params,
logsExporter.ConsumeLogs,
exporterhelper.WithQueue(expConfig.enforcedQueueSettings()),
exporterhelper.WithRetry(expConfig.RetrySettings),
)

}

func (e *exporter) ConsumeLogs(ctx context.Context, ld pdata.Logs) error {
cwLogsPusher := e.pusher
logEvents, _ := logsToCWLogs(e.logger, ld)
if len(logEvents) == 0 {
return nil
}

e.logger.Debug("Putting log events", zap.Int("num_of_events", len(logEvents)))
input := &cloudwatchlogs.PutLogEventsInput{
LogGroupName: aws.String(e.config.LogGroupName),
LogStreamName: aws.String(e.config.LogStreamName),
LogEvents: logEvents,
for _, logEvent := range logEvents {
logEvent := &cwlogs.Event{
InputLogEvent: logEvent,
GeneratedTime: time.Now(),
}
e.logger.Debug("Adding log event", zap.Any("event", logEvent))
err := cwLogsPusher.AddLogEntry(logEvent)
if err != nil {
e.logger.Error("Failed ", zap.Int("num_of_events", len(logEvents)))
}
}
if e.seqToken != "" {
input.SequenceToken = aws.String(e.seqToken)
} else {
e.logger.Debug("Putting log events without a sequence token")
e.logger.Debug("Log events are successfully put")
flushErr := cwLogsPusher.ForceFlush()
if flushErr != nil {
e.logger.Error("Error force flushing logs. Skipping to next logPusher.", zap.Error(flushErr))
return flushErr
}
return nil
}

out, err := e.client.PutLogEvents(input)
if err != nil {
return err
}
if info := out.RejectedLogEventsInfo; info != nil {
return fmt.Errorf("log event rejected: %s", info.String())
func (e *exporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (e *exporter) Shutdown(ctx context.Context) error {
if e.pusher != nil {
e.pusher.ForceFlush()
}
e.logger.Debug("Log events are successfully put")
return nil
}

e.seqToken = *out.NextSequenceToken
func (e *exporter) Start(ctx context.Context, host component.Host) error {
return nil
}

Expand Down
66 changes: 66 additions & 0 deletions exporter/awscloudwatchlogsexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,43 @@
package awscloudwatchlogsexporter

import (
"context"
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/model/pdata"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs"
)

type mockPusher struct {
mock.Mock
}

func (p *mockPusher) AddLogEntry(logEvent *cwlogs.Event) error {
args := p.Called(nil)
errorStr := args.String(0)
if errorStr != "" {
return awserr.NewRequestFailure(nil, 400, "").(error)
}
return nil
}

func (p *mockPusher) ForceFlush() error {
args := p.Called(nil)
errorStr := args.String(0)
if errorStr != "" {
return awserr.NewRequestFailure(nil, 400, "").(error)
}
return nil
}

func TestLogToCWLog(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -200,3 +229,40 @@ func TestAttrValue(t *testing.T) {
})
}
}

func TestConsumeLogs(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
factory := NewFactory()
expCfg := factory.CreateDefaultConfig().(*Config)
expCfg.Region = "us-west-2"
expCfg.LogGroupName = "testGroup"
expCfg.LogStreamName = "testStream"
expCfg.MaxRetries = 0
exp, err := newCwLogsPusher(expCfg, componenttest.NewNopExporterCreateSettings())
assert.Nil(t, err)
assert.NotNil(t, exp)
ld := pdata.NewLogs()
r := ld.ResourceLogs().AppendEmpty()
r.Resource().Attributes().UpsertString("hello", "test")
logRecords := r.InstrumentationLibraryLogs().AppendEmpty().Logs()
logRecords.EnsureCapacity(5)
logRecords.AppendEmpty().SetName("test")
assert.Equal(t, 1, ld.LogRecordCount())

logPusher := new(mockPusher)
logPusher.On("AddLogEntry", nil).Return("").Once()
logPusher.On("ForceFlush", nil).Return("").Twice()
exp.(*exporter).pusher = logPusher
require.NoError(t, exp.(*exporter).ConsumeLogs(ctx, ld))
require.NoError(t, exp.Shutdown(ctx))
}

func TestNewExporterWithoutRegionErr(t *testing.T) {
factory := NewFactory()
expCfg := factory.CreateDefaultConfig().(*Config)
expCfg.MaxRetries = 0
exp, err := newCwLogsExporter(expCfg, componenttest.NewNopExporterCreateSettings())
assert.Nil(t, exp)
assert.NotNil(t, err)
}
Loading