From a77e3afb76c770214918318fa67b46a3383197e4 Mon Sep 17 00:00:00 2001 From: jsirianni Date: Thu, 16 Sep 2021 12:44:27 -0400 Subject: [PATCH] port https://github.com/observIQ/stanza/pull/420 --- .../input/aws/cloudwatch/cloudwatch.go | 156 ++++++++++++++---- .../aws/cloudwatch/cloudwatch_persist.go | 3 + .../input/aws/cloudwatch/cloudwatch_test.go | 137 ++++++++++++++- 3 files changed, 260 insertions(+), 36 deletions(-) diff --git a/pkg/receiver/operators/input/aws/cloudwatch/cloudwatch.go b/pkg/receiver/operators/input/aws/cloudwatch/cloudwatch.go index 83983714f..227eb28fb 100644 --- a/pkg/receiver/operators/input/aws/cloudwatch/cloudwatch.go +++ b/pkg/receiver/operators/input/aws/cloudwatch/cloudwatch.go @@ -36,9 +36,17 @@ func NewCloudwatchConfig(operatorID string) *CloudwatchInputConfig { type CloudwatchInputConfig struct { helper.InputConfig `yaml:",inline"` - // required + // LogGroupName is deprecated but still supported for compatibility with older configurations LogGroupName string `json:"log_group_name,omitempty" yaml:"log_group_name,omitempty"` - Region string `json:"region,omitempty" yaml:"region,omitempty"` + + // LogGroups is a list of log groups + LogGroups []string `json:"log_groups,omitempty" yaml:"log_groups,omitempty"` + + // LogGroupPrefix is used to append to LogGroups and can be used in conjunction with LogGroups + LogGroupPrefix string `json:"log_group_prefix,omitempty" yaml:"log_group_prefix,omitempty"` + + // Region is the AWS region to target + Region string `json:"region,omitempty" yaml:"region,omitempty"` // optional LogStreamNamePrefix string `json:"log_stream_name_prefix,omitempty" yaml:"log_stream_name_prefix,omitempty"` @@ -56,12 +64,13 @@ func (c *CloudwatchInputConfig) Build(buildContext operator.BuildContext) ([]ope return nil, err } - if c.LogGroupName == "" { - return nil, fmt.Errorf("missing required %s parameter 'log_group_name'", operatorName) + if c.LogGroupName == "" && len(c.LogGroups) == 0 && c.LogGroupPrefix == "" { + // LogGroupName is depricated, do not log it + return nil, fmt.Errorf("missing required %s parameter 'log_groups', or log_group_prefix", operatorName) } if len(c.LogStreamNames) > 0 && c.LogStreamNamePrefix != "" { - return nil, fmt.Errorf("invalid configuration. Cannot use both 'log_stream_names' and 'log_stream_name_prefix' %s parameters", operatorName) + return nil, fmt.Errorf("invalid configuration. cannot use both 'log_stream_names' and 'log_stream_name_prefix' %s parameters", operatorName) } if c.Region == "" { @@ -89,6 +98,8 @@ func (c *CloudwatchInputConfig) Build(buildContext operator.BuildContext) ([]ope cloudwatchInput := &CloudwatchInput{ InputOperator: inputOperator, logGroupName: c.LogGroupName, + logGroups: c.LogGroups, + logGroupPrefix: c.LogGroupPrefix, logStreamNames: c.LogStreamNames, logStreamNamePrefix: c.LogStreamNamePrefix, region: c.Region, @@ -107,6 +118,8 @@ type CloudwatchInput struct { pollInterval helper.Duration logGroupName string + logGroups []string + logGroupPrefix string logStreamNames []*string logStreamNamePrefix string region string @@ -116,6 +129,8 @@ type CloudwatchInput struct { startTime int64 persist Persister wg sync.WaitGroup + + session *cloudwatchlogs.CloudWatchLogs } // Start will start generating log entries. @@ -123,8 +138,17 @@ func (c *CloudwatchInput) Start(persister operator.Persister) error { c.persist = Persister{DB: persister} ctx, cancel := context.WithCancel(context.Background()) c.cancel = cancel - c.wg.Add(1) - go c.pollEvents(ctx) + + if err := c.configureSession(); err != nil { + return fmt.Errorf("failed to configure AWS SDK: %s", err) + } + + c.buildLogGroupList() + + for _, logGroup := range c.logGroups { + c.wg.Add(1) + go c.pollEvents(ctx, logGroup) + } return nil } @@ -137,18 +161,12 @@ func (c *CloudwatchInput) Stop() error { } // pollEvents gets events from AWS Cloudwatch Logs every poll interval. -func (c *CloudwatchInput) pollEvents(ctx context.Context) { - c.Infof("Started polling AWS Cloudwatch Logs group '%s' using poll interval of '%s'", c.logGroupName, c.pollInterval) +func (c *CloudwatchInput) pollEvents(ctx context.Context, logGroupName string) { + c.Infof("Started polling AWS Cloudwatch Logs group '%s' using poll interval of '%s'", logGroupName, c.pollInterval) defer c.wg.Done() - // Create session to use when connecting to AWS Cloudwatch Logs - svc, sessionErr := c.sessionBuilder() - if sessionErr != nil { - c.Errorf("failed to create new session: %s", sessionErr) - } - // Get events immediately when operator starts then use poll_interval duration. - err := c.getEvents(ctx, svc) + err := c.getEvents(ctx, logGroupName) if err != nil { c.Errorf("failed to get events: %s", err) } @@ -159,7 +177,7 @@ func (c *CloudwatchInput) pollEvents(ctx context.Context) { case <-ctx.Done(): return case <-time.After(c.pollInterval.Duration): - err := c.getEvents(ctx, svc) + err := c.getEvents(ctx, logGroupName) if err != nil { c.Errorf("failed to get events: %s", err) } @@ -167,6 +185,27 @@ func (c *CloudwatchInput) pollEvents(ctx context.Context) { } } +// configureSession configures access to AWS +func (c *CloudwatchInput) configureSession() error { + var lastError error + + sum := 0 + for i := 1; i < 5; i++ { + s, err := c.sessionBuilder() + if err != nil { + c.Errorf("failed to configure AWS session: %s", err) + lastError = err + sum += i + continue + } + + c.session = s + return nil + } + + return lastError +} + // sessionBuilder builds a session for AWS Cloudwatch Logs func (c *CloudwatchInput) sessionBuilder() (*cloudwatchlogs.CloudWatchLogs, error) { region := aws.String(c.region) @@ -187,27 +226,27 @@ func (c *CloudwatchInput) sessionBuilder() (*cloudwatchlogs.CloudWatchLogs, erro } // getEvents uses a session to get events from AWS Cloudwatch Logs -func (c *CloudwatchInput) getEvents(ctx context.Context, svc *cloudwatchlogs.CloudWatchLogs) error { +func (c *CloudwatchInput) getEvents(ctx context.Context, logGroupName string) error { nextToken := "" - st, err := c.persist.Read(ctx, c.ID()) + st, err := c.persist.Read(ctx, fmt.Sprintf("%s-%s", c.ID(), logGroupName)) if err != nil { c.Errorf("failed to get persist: %s", err) } - c.Debugf("Read start time %d from database", st) + c.Debugf("Read start time %d for log group %s from database", st, logGroupName) c.startTime = st if c.startAtEnd && c.startTime == 0 { c.startTime = currentTimeInUnixMilliseconds(time.Now()) c.Debugf("Setting start time to current time: %d", c.startTime) } - c.Debugf("Getting events from AWS Cloudwatch Logs groupname '%s' using start time of %s", c.logGroupName, fromUnixMilli(c.startTime)) + c.Debugf("Getting events from AWS Cloudwatch Logs groupname '%s' using start time of %s", logGroupName, fromUnixMilli(c.startTime)) for { select { case <-ctx.Done(): return nil default: - input := c.filterLogEventsInputBuilder(nextToken) + input := c.filterLogEventsInputBuilder(nextToken, logGroupName) - resp, err := svc.FilterLogEvents(&input) + resp, err := c.session.FilterLogEvents(&input) if err != nil { return err } @@ -216,7 +255,7 @@ func (c *CloudwatchInput) getEvents(ctx context.Context, svc *cloudwatchlogs.Clo break } - c.handleEvents(ctx, resp.Events) + c.handleEvents(ctx, resp.Events, logGroupName) if resp.NextToken == nil { break @@ -228,8 +267,8 @@ func (c *CloudwatchInput) getEvents(ctx context.Context, svc *cloudwatchlogs.Clo // filterLogEventsInputBuilder builds AWS Cloudwatch Logs Filter Log Events Input based on provided values // and returns completed input. -func (c *CloudwatchInput) filterLogEventsInputBuilder(nextToken string) cloudwatchlogs.FilterLogEventsInput { - logGroupNamePtr := aws.String(c.logGroupName) +func (c *CloudwatchInput) filterLogEventsInputBuilder(nextToken string, logGroupName string) cloudwatchlogs.FilterLogEventsInput { + logGroupNamePtr := aws.String(logGroupName) limit := aws.Int64(c.eventLimit) startTime := aws.Int64(c.startTime) @@ -295,7 +334,7 @@ func (c *CloudwatchInput) filterLogEventsInputBuilder(nextToken string) cloudwat } // handleEvent is the handler for a AWS Cloudwatch Logs Filtered Event. -func (c *CloudwatchInput) handleEvent(ctx context.Context, event *cloudwatchlogs.FilteredLogEvent) { +func (c *CloudwatchInput) handleEvent(ctx context.Context, event *cloudwatchlogs.FilteredLogEvent, logGroupName string) { e := map[string]interface{}{} if event.Message != nil { e["message"] = *event.Message @@ -308,7 +347,7 @@ func (c *CloudwatchInput) handleEvent(ctx context.Context, event *cloudwatchlogs c.Errorf("Failed to create new entry from record: %s", err) } - entry.AddResourceKey("log_group", c.logGroupName) + entry.AddResourceKey("log_group", logGroupName) entry.AddResourceKey("region", c.region) entry.AddResourceKey("log_stream", *event.LogStreamName) entry.AddResourceKey("event_id", *event.EventId) @@ -321,16 +360,71 @@ func (c *CloudwatchInput) handleEvent(ctx context.Context, event *cloudwatchlogs if *event.IngestionTime > c.startTime { c.startTime = *event.IngestionTime c.Debugf("Writing start time %d to database", *event.IngestionTime) - err := c.persist.Write(ctx, c.logGroupName, c.startTime) + err := c.persist.Write(ctx, logGroupName, c.startTime) if err != nil { c.Errorf("Failed to update persistent storage: %w", err) } } } -func (c *CloudwatchInput) handleEvents(ctx context.Context, events []*cloudwatchlogs.FilteredLogEvent) { +func (c *CloudwatchInput) handleEvents(ctx context.Context, events []*cloudwatchlogs.FilteredLogEvent, logGroupName string) { for _, event := range events { - c.handleEvent(ctx, event) + c.handleEvent(ctx, event, logGroupName) + } +} + +// buildLogGroupList merges log_group_name and log_group_prefix into log_groups +func (c *CloudwatchInput) buildLogGroupList() { + if c.logGroupName != "" { + found := false + for _, group := range c.logGroups { + if c.logGroupName == group { + found = true + break + } + } + if !found { + c.logGroups = append(c.logGroups, c.logGroupName) + } + } + + if c.logGroupPrefix != "" { + c.detectLogGroups() + } +} + +// detectLogGroups detects log groups from a prefix +func (c *CloudwatchInput) detectLogGroups() { + limit := int64(50) // Max allowed by aws + req := &cloudwatchlogs.DescribeLogGroupsInput{ + Limit: &limit, + LogGroupNamePrefix: &c.logGroupPrefix, + } + + resp, err := c.session.DescribeLogGroups(req) + if err != nil { + c.Errorf("failed to detect log group names: %s", err) + return + } + + for _, logGroup := range resp.LogGroups { + g := *logGroup.LogGroupName + + found := false + for _, logGroup := range c.logGroups { + if logGroup == g { + found = true + break + } + } + if !found { + c.Debugf("detected log group '%s'", g) + c.logGroups = append(c.logGroups, g) + } + } + + if resp.NextToken != nil { + req.NextToken = resp.NextToken } } diff --git a/pkg/receiver/operators/input/aws/cloudwatch/cloudwatch_persist.go b/pkg/receiver/operators/input/aws/cloudwatch/cloudwatch_persist.go index c296d176a..456dea439 100644 --- a/pkg/receiver/operators/input/aws/cloudwatch/cloudwatch_persist.go +++ b/pkg/receiver/operators/input/aws/cloudwatch/cloudwatch_persist.go @@ -22,6 +22,9 @@ func (p *Persister) Read(ctx context.Context, key string) (int64, error) { buffer := bytes.NewBuffer(startTimeBytes) var startTime int64 err = binary.Read(buffer, binary.BigEndian, &startTime) + if err != nil && err.Error() != "EOF" { + return 0, err + } return startTime, err } diff --git a/pkg/receiver/operators/input/aws/cloudwatch/cloudwatch_test.go b/pkg/receiver/operators/input/aws/cloudwatch/cloudwatch_test.go index 8e44ea819..959bf7fae 100644 --- a/pkg/receiver/operators/input/aws/cloudwatch/cloudwatch_test.go +++ b/pkg/receiver/operators/input/aws/cloudwatch/cloudwatch_test.go @@ -10,15 +10,14 @@ import ( ) func TestBuild(t *testing.T) { - logGroupName := "test" basicConfig := func() *CloudwatchInputConfig { cfg := NewCloudwatchConfig("test_operator_id") - cfg.LogGroupName = logGroupName cfg.Region = "test-region" return cfg } - var testStreams = []*string{&logGroupName} + var stream = "test stream" + var testStreams = []*string{&stream} cases := []struct { name string input *CloudwatchInputConfig @@ -28,6 +27,10 @@ func TestBuild(t *testing.T) { "default", func() *CloudwatchInputConfig { cfg := basicConfig() + cfg.LogGroups = []string{ + "test", + "test-2", + } return cfg }(), false, @@ -36,6 +39,10 @@ func TestBuild(t *testing.T) { "log-stream-name-prefix", func() *CloudwatchInputConfig { cfg := basicConfig() + cfg.LogGroups = []string{ + "test", + "test-2", + } cfg.LogStreamNamePrefix = "" return cfg }(), @@ -45,6 +52,10 @@ func TestBuild(t *testing.T) { "event-limit", func() *CloudwatchInputConfig { cfg := basicConfig() + cfg.LogGroups = []string{ + "test", + "test-2", + } cfg.EventLimit = 5000 return cfg }(), @@ -54,6 +65,10 @@ func TestBuild(t *testing.T) { "poll-interval", func() *CloudwatchInputConfig { cfg := basicConfig() + cfg.LogGroups = []string{ + "test", + "test-2", + } cfg.PollInterval = helper.Duration{Duration: 15 * time.Second} return cfg }(), @@ -63,6 +78,10 @@ func TestBuild(t *testing.T) { "profile", func() *CloudwatchInputConfig { cfg := basicConfig() + cfg.LogGroups = []string{ + "test", + "test-2", + } cfg.Profile = "test" return cfg }(), @@ -72,6 +91,10 @@ func TestBuild(t *testing.T) { "log-stream-names", func() *CloudwatchInputConfig { cfg := basicConfig() + cfg.LogGroups = []string{ + "test", + "test-2", + } cfg.LogStreamNames = testStreams return cfg }(), @@ -81,6 +104,10 @@ func TestBuild(t *testing.T) { "startat-end", func() *CloudwatchInputConfig { cfg := basicConfig() + cfg.LogGroups = []string{ + "test", + "test-2", + } cfg.StartAt = "end" return cfg }(), @@ -90,8 +117,12 @@ func TestBuild(t *testing.T) { "logStreamNames and logStreamNamePrefix both parameters Error", func() *CloudwatchInputConfig { cfg := basicConfig() + cfg.LogGroups = []string{ + "test", + "test-2", + } cfg.LogStreamNames = testStreams - cfg.LogStreamNamePrefix = logGroupName + cfg.LogStreamNamePrefix = "some prefix" return cfg }(), true, @@ -100,8 +131,12 @@ func TestBuild(t *testing.T) { "startat-beginning", func() *CloudwatchInputConfig { cfg := basicConfig() + cfg.LogGroups = []string{ + "test", + "test-2", + } cfg.StartAt = "beginning" - cfg.LogStreamNamePrefix = logGroupName + cfg.LogStreamNamePrefix = "some prefix" return cfg }(), false, @@ -110,6 +145,10 @@ func TestBuild(t *testing.T) { "poll-interval-invalid", func() *CloudwatchInputConfig { cfg := basicConfig() + cfg.LogGroups = []string{ + "test", + "test-2", + } cfg.PollInterval = helper.Duration{Duration: time.Second * 0} return cfg }(), @@ -119,6 +158,10 @@ func TestBuild(t *testing.T) { "event-limit-invalid", func() *CloudwatchInputConfig { cfg := basicConfig() + cfg.LogGroups = []string{ + "test", + "test-2", + } cfg.EventLimit = 10001 return cfg }(), @@ -128,11 +171,95 @@ func TestBuild(t *testing.T) { "default-required-startat-invalid", func() *CloudwatchInputConfig { cfg := basicConfig() + cfg.LogGroups = []string{ + "test", + "test-2", + } cfg.StartAt = "invalid" return cfg }(), true, }, + { + "log-group-name", + func() *CloudwatchInputConfig { + cfg := basicConfig() + cfg.LogGroupName = "test" + return cfg + }(), + false, + }, + { + "log-groups", + func() *CloudwatchInputConfig { + cfg := basicConfig() + cfg.LogGroups = []string{ + "test", + "test-2", + } + return cfg + }(), + false, + }, + { + "log-groups-and-log-group-name", + func() *CloudwatchInputConfig { + cfg := basicConfig() + cfg.LogGroupName = "test" + cfg.LogGroups = []string{ + "test", + "test-2", + } + return cfg + }(), + false, + }, + { + "log-group-prefix", + func() *CloudwatchInputConfig { + cfg := basicConfig() + cfg.LogGroupPrefix = "/aws" + return cfg + }(), + false, + }, + { + "log-group-prefix-and-log-groups", + func() *CloudwatchInputConfig { + cfg := basicConfig() + cfg.LogGroupPrefix = "/aws" + cfg.LogGroups = []string{ + "test", + "test-2", + } + return cfg + }(), + false, + }, + { + "log-group-prefix-and-log-group-name", + func() *CloudwatchInputConfig { + cfg := basicConfig() + cfg.LogGroupPrefix = "/aws" + cfg.LogGroupName = "test" + return cfg + }(), + false, + }, + { + "log_group_prefix-log_group_name-log_groups", + func() *CloudwatchInputConfig { + cfg := basicConfig() + cfg.LogGroupPrefix = "/aws" + cfg.LogGroupName = "test" + cfg.LogGroups = []string{ + "test", + "aws", + } + return cfg + }(), + false, + }, } for _, tc := range cases {