Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable awscloudwatchlogsexporter and use cwlogs package to export logs #7152

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cmd/configschema/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,12 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib v0.42.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry v0.42.0
github.com/stretchr/testify v1.7.0
go.uber.org/multierr v1.7.0
go.opentelemetry.io/collector v0.42.0
go.opentelemetry.io/collector/model v0.42.0
go.uber.org/multierr v1.7.0
golang.org/x/mod v0.5.1
)

require go.uber.org/multierr v1.7.0

require (
bitbucket.org/atlassian/go-asap/v2 v2.6.0 // indirect
cloud.google.com/go v0.100.2 // indirect
Expand Down Expand Up @@ -192,6 +190,7 @@ require (
github.com/observiq/go-syslog/v3 v3.0.2 // indirect
github.com/olivere/elastic v6.2.37+incompatible // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/alibabacloudlogserviceexporter v0.42.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awscloudwatchlogsexporter v0.42.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter v0.42.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter v0.42.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsprometheusremotewriteexporter v0.42.0 // indirect
Expand Down Expand Up @@ -488,6 +487,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awski

replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsprometheusremotewriteexporter => ../../exporter/awsprometheusremotewriteexporter

replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awscloudwatchlogsexporter => ../../exporter/awscloudwatchlogsexporter

replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsxrayexporter => ../../exporter/awsxrayexporter

replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/azuremonitorexporter => ../../exporter/azuremonitorexporter
Expand Down
1 change: 1 addition & 0 deletions cmd/configschema/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions exporter/awscloudwatchlogsexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil"
)

// Config represent a configuration for the CloudWatch logs exporter.
Expand Down Expand Up @@ -48,6 +51,10 @@ type Config struct {
// QueueSettings is a subset of exporterhelper.QueueSettings,
// because only QueueSize is user-settable due to how AWS CloudWatch API works
QueueSettings QueueSettings `mapstructure:"sending_queue"`

logger *zap.Logger

awsutil.AWSSessionSettings `mapstructure:",squash"`
}

type QueueSettings struct {
Expand Down
20 changes: 12 additions & 8 deletions exporter/awscloudwatchlogsexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/service/servicetest"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil"
)

func TestLoadConfig(t *testing.T) {
Expand All @@ -45,12 +47,13 @@ func TestLoadConfig(t *testing.T) {

assert.Equal(t,
&Config{
ExporterSettings: config.NewExporterSettings(config.NewComponentIDWithName(typeStr, "e1-defaults")),
RetrySettings: defaultRetrySettings,
LogGroupName: "test-1",
LogStreamName: "testing",
Region: "",
Endpoint: "",
ExporterSettings: config.NewExporterSettings(config.NewComponentIDWithName(typeStr, "e1-defaults")),
RetrySettings: defaultRetrySettings,
LogGroupName: "test-1",
LogStreamName: "testing",
Region: "",
Endpoint: "",
AWSSessionSettings: awsutil.CreateDefaultSessionConfig(),
QueueSettings: QueueSettings{
QueueSize: exporterhelper.DefaultQueueSettings().QueueSize,
},
Expand All @@ -69,8 +72,9 @@ func TestLoadConfig(t *testing.T) {
MaxInterval: defaultRetrySettings.MaxInterval,
MaxElapsedTime: defaultRetrySettings.MaxElapsedTime,
},
LogGroupName: "test-2",
LogStreamName: "testing",
AWSSessionSettings: awsutil.CreateDefaultSessionConfig(),
LogGroupName: "test-2",
LogStreamName: "testing",
QueueSettings: QueueSettings{
QueueSize: 2,
},
Expand Down
176 changes: 107 additions & 69 deletions exporter/awscloudwatchlogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,109 +18,147 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/google/uuid"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/model/pdata"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs"
)

type exporter struct {
config *Config
logger *zap.Logger
config config.Exporter
logger *zap.Logger
retryCount int
collectorID string
svcStructuredLog *cwlogs.Client
seqTokenMu sync.Mutex
// Keep track of all pushers created
// For every log group exists multiple log streams, for every log stream exists a Pusher
groupStreamToPusherMap map[string]map[string]cwlogs.Pusher
}

startOnce sync.Once
client *cloudwatchlogs.CloudWatchLogs // available after startOnce
func newCwLogsExporter(config config.Exporter, params component.ExporterCreateSettings) (component.LogsExporter, error) {
if config == nil {
return nil, errors.New("emf exporter config is nil")
}

seqTokenMu sync.Mutex
seqToken string
}
expConfig := config.(*Config)
expConfig.logger = params.Logger

func (e *exporter) Start(ctx context.Context, host component.Host) error {
var startErr error
e.startOnce.Do(func() {
awsConfig := &aws.Config{}
if e.config.Region != "" {
awsConfig.Region = aws.String(e.config.Region)
}
if e.config.Endpoint != "" {
awsConfig.Endpoint = aws.String(e.config.Endpoint)
}
awsConfig.MaxRetries = aws.Int(1) // retry will be handled by the collector queue
sess, err := session.NewSession(awsConfig)
if err != nil {
startErr = err
return
}
e.client = cloudwatchlogs.New(sess)
// create AWS session
awsConfig, session, err := awsutil.GetAWSConfigSession(params.Logger, &awsutil.Conn{}, &expConfig.AWSSessionSettings)
if err != nil {
return nil, err
}

e.logger.Debug("Retrieving CloudWatch sequence token")
out, err := e.client.DescribeLogStreams(&cloudwatchlogs.DescribeLogStreamsInput{
LogGroupName: aws.String(e.config.LogGroupName),
LogStreamNamePrefix: aws.String(e.config.LogStreamName),
})
if err != nil {
startErr = err
return
}
if len(out.LogStreams) == 0 {
startErr = errors.New("cannot find log group and stream")
return
}
stream := out.LogStreams[0]
if stream.UploadSequenceToken == nil {
e.logger.Debug("CloudWatch sequence token is nil, will assume empty")
return
}
e.seqToken = *stream.UploadSequenceToken
})
return startErr
}
// create CWLogs client with aws session config
svcStructuredLog := cwlogs.NewClient(params.Logger, awsConfig, params.BuildInfo, expConfig.LogGroupName, session)
collectorIdentifier, err := uuid.NewRandom()

if err != nil {
return nil, err
}

expConfig.Validate()

logsExporter := &exporter{
svcStructuredLog: svcStructuredLog,
config: config,
logger: params.Logger,
retryCount: *awsConfig.MaxRetries,
collectorID: collectorIdentifier.String(),
}
logsExporter.groupStreamToPusherMap = map[string]map[string]cwlogs.Pusher{}

return exporterhelper.NewLogsExporter(
config,
params,
logsExporter.PushLogs,
exporterhelper.WithQueue(expConfig.enforcedQueueSettings()),
exporterhelper.WithRetry(expConfig.RetrySettings),
)

func (e *exporter) Shutdown(ctx context.Context) error {
// TODO(jbd): Signal shutdown to flush the logs.
return nil
}

func (e *exporter) PushLogs(ctx context.Context, ld pdata.Logs) (err error) {
func (e *exporter) PushLogs(ctx context.Context, ld pdata.Logs) error {
// TODO(jbd): Relax this once CW Logs support ingest
// without sequence tokens.
e.seqTokenMu.Lock()
defer e.seqTokenMu.Unlock()

exp := e.config.(*Config)
cwLogsPusher := e.getLogPusher(exp.LogGroupName, exp.LogStreamName)
logEvents, _ := logsToCWLogs(e.logger, ld)
if len(logEvents) == 0 {
return nil
}

e.logger.Debug("Putting log events", zap.Int("num_of_events", len(logEvents)))
input := &cloudwatchlogs.PutLogEventsInput{
LogGroupName: aws.String(e.config.LogGroupName),
LogStreamName: aws.String(e.config.LogStreamName),
LogEvents: logEvents,
e.logger.Info("Putting log events", zap.Int("num_of_events", len(logEvents)))

for _, logEvent := range logEvents {
logEvent := &cwlogs.Event{
InputLogEvent: logEvent,
GeneratedTime: time.Now(),
}
e.logger.Debug("Adding log event", zap.Any("event", logEvent))
err := cwLogsPusher.AddLogEntry(logEvent)
if err != nil {
e.logger.Error("Failed ", zap.Int("num_of_events", len(logEvents)))
}
}
if e.seqToken != "" {
input.SequenceToken = aws.String(e.seqToken)
} else {
e.logger.Debug("Putting log events without a sequence token")
e.logger.Debug("Log events are successfully put")
flushErr := cwLogsPusher.ForceFlush()
if flushErr != nil {
e.logger.Error("Error force flushing logs. Skipping to next logPusher.", zap.Error(flushErr))
return flushErr
}
return nil
}

out, err := e.client.PutLogEvents(input)
if err != nil {
return err
func (e *exporter) ConsumeLogs(ctx context.Context, md pdata.Logs) error {
return e.PushLogs(ctx, md)
}

func (e *exporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (e *exporter) Shutdown(ctx context.Context) error {
exp := e.config.(*Config)
logPusher := e.getLogPusher(exp.LogGroupName, exp.LogStreamName)
logPusher.ForceFlush()
return nil
}
func (e *exporter) Start(ctx context.Context, host component.Host) error {
return nil
}

func (e *exporter) getLogPusher(logGroup, logStream string) cwlogs.Pusher {

var ok bool
var streamToPusherMap map[string]cwlogs.Pusher
if streamToPusherMap, ok = e.groupStreamToPusherMap[logGroup]; !ok {
streamToPusherMap = map[string]cwlogs.Pusher{}
e.groupStreamToPusherMap[logGroup] = streamToPusherMap
}
if info := out.RejectedLogEventsInfo; info != nil {
return fmt.Errorf("log event rejected: %s", info.String())

var logPusher cwlogs.Pusher
if logPusher, ok = streamToPusherMap[logStream]; !ok {
logPusher = cwlogs.NewPusher(aws.String(logGroup), aws.String(logStream), e.retryCount, *e.svcStructuredLog, e.logger)
streamToPusherMap[logStream] = logPusher
}
e.logger.Debug("Log events are successfully put")
return logPusher

e.seqToken = *out.NextSequenceToken
return nil
}

func logsToCWLogs(logger *zap.Logger, ld pdata.Logs) ([]*cloudwatchlogs.InputLogEvent, int) {
Expand Down
22 changes: 8 additions & 14 deletions exporter/awscloudwatchlogsexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/exporter/exporterhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil"
)

const typeStr = "awscloudwatchlogs"
Expand All @@ -36,28 +38,20 @@ func NewFactory() component.ExporterFactory {

func createDefaultConfig() config.Exporter {
return &Config{
ExporterSettings: config.NewExporterSettings(config.NewComponentID(typeStr)),
RetrySettings: exporterhelper.DefaultRetrySettings(),
ExporterSettings: config.NewExporterSettings(config.NewComponentID(typeStr)),
RetrySettings: exporterhelper.DefaultRetrySettings(),
AWSSessionSettings: awsutil.CreateDefaultSessionConfig(),
QueueSettings: QueueSettings{
QueueSize: exporterhelper.DefaultQueueSettings().QueueSize,
},
}
}

func createLogsExporter(_ context.Context, set component.ExporterCreateSettings, cfg config.Exporter) (component.LogsExporter, error) {
oCfg, ok := cfg.(*Config)
func createLogsExporter(_ context.Context, params component.ExporterCreateSettings, config config.Exporter) (component.LogsExporter, error) {
expConfig, ok := config.(*Config)
if !ok {
return nil, errors.New("invalid configuration type; can't cast to awscloudwatchlogsexporter.Config")
}
return newCwLogsExporter(expConfig, params)

exporter := &exporter{config: oCfg, logger: set.Logger}
return exporterhelper.NewLogsExporter(
oCfg,
set,
exporter.PushLogs,
exporterhelper.WithStart(exporter.Start),
exporterhelper.WithShutdown(exporter.Shutdown),
exporterhelper.WithQueue(oCfg.enforcedQueueSettings()),
exporterhelper.WithRetry(oCfg.RetrySettings),
)
}
7 changes: 5 additions & 2 deletions exporter/awscloudwatchlogsexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ import (
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/exporter/exporterhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil"
)

func TestDefaultConfig_exporterSettings(t *testing.T) {
want := &Config{
ExporterSettings: config.NewExporterSettings(config.NewComponentID(typeStr)),
RetrySettings: exporterhelper.DefaultRetrySettings(),
ExporterSettings: config.NewExporterSettings(config.NewComponentID(typeStr)),
RetrySettings: exporterhelper.DefaultRetrySettings(),
AWSSessionSettings: awsutil.CreateDefaultSessionConfig(),
QueueSettings: QueueSettings{
QueueSize: exporterhelper.DefaultQueueSettings().QueueSize,
},
Expand Down
Loading