diff --git a/operator/builtin/input/aws/cloudwatch/cloudwatch.go b/operator/builtin/input/aws/cloudwatch/cloudwatch.go index 54430d390..38e89fdf2 100644 --- a/operator/builtin/input/aws/cloudwatch/cloudwatch.go +++ b/operator/builtin/input/aws/cloudwatch/cloudwatch.go @@ -36,9 +36,17 @@ func NewCloudwatchConfig(operatorID string) *CloudwatchInputConfig { type CloudwatchInputConfig struct { helper.InputConfig `yaml:",inline"` - // required + // LogGroupName is deprecated but still supported for compatibility with older configurations LogGroupName string `json:"log_group_name,omitempty" yaml:"log_group_name,omitempty"` - Region string `json:"region,omitempty" yaml:"region,omitempty"` + + // LogGroups is a list of log groups + LogGroups []string `json:"log_groups,omitempty" yaml:"log_groups,omitempty"` + + // LogGroupPrefix is used to append to LogGroups and can be used in conjunction with LogGroups + LogGroupPrefix string `json:"log_group_prefix,omitempty" yaml:"log_group_prefix,omitempty"` + + // Region is the AWS region to target + Region string `json:"region,omitempty" yaml:"region,omitempty"` // optional LogStreamNamePrefix string `json:"log_stream_name_prefix,omitempty" yaml:"log_stream_name_prefix,omitempty"` @@ -56,12 +64,13 @@ func (c *CloudwatchInputConfig) Build(buildContext operator.BuildContext) ([]ope return nil, err } - if c.LogGroupName == "" { - return nil, fmt.Errorf("missing required %s parameter 'log_group_name'", operatorName) + if c.LogGroupName == "" && len(c.LogGroups) == 0 && c.LogGroupPrefix == "" { + // LogGroupName is depricated, do not log it + return nil, fmt.Errorf("missing required %s parameter 'log_groups', or log_group_prefix", operatorName) } if len(c.LogStreamNames) > 0 && c.LogStreamNamePrefix != "" { - return nil, fmt.Errorf("invalid configuration. Cannot use both 'log_stream_names' and 'log_stream_name_prefix' %s parameters", operatorName) + return nil, fmt.Errorf("invalid configuration, cannot use both 'log_stream_names' and 'log_stream_name_prefix' %s parameters", operatorName) } if c.Region == "" { @@ -89,6 +98,8 @@ func (c *CloudwatchInputConfig) Build(buildContext operator.BuildContext) ([]ope cloudwatchInput := &CloudwatchInput{ InputOperator: inputOperator, logGroupName: c.LogGroupName, + logGroups: c.LogGroups, + logGroupPrefix: c.LogGroupPrefix, logStreamNames: c.LogStreamNames, logStreamNamePrefix: c.LogStreamNamePrefix, region: c.Region, @@ -110,6 +121,8 @@ type CloudwatchInput struct { pollInterval helper.Duration logGroupName string + logGroups []string + logGroupPrefix string logStreamNames []*string logStreamNamePrefix string region string @@ -119,6 +132,8 @@ type CloudwatchInput struct { startTime int64 persist Persister wg sync.WaitGroup + + session *cloudwatchlogs.CloudWatchLogs } // Start will start generating log entries. @@ -126,11 +141,20 @@ func (c *CloudwatchInput) Start() error { ctx, cancel := context.WithCancel(context.Background()) c.cancel = cancel + if err := c.configureSession(); err != nil { + return fmt.Errorf("failed to configure AWS SDK: %s", err) + } + + c.buildLogGroupList() + if err := c.persist.DB.Load(); err != nil { return err } - c.wg.Add(1) - go c.pollEvents(ctx) + + for _, logGroup := range c.logGroups { + c.wg.Add(1) + go c.pollEvents(ctx, logGroup) + } return nil } @@ -143,18 +167,12 @@ func (c *CloudwatchInput) Stop() error { } // pollEvents gets events from AWS Cloudwatch Logs every poll interval. -func (c *CloudwatchInput) pollEvents(ctx context.Context) { - c.Infof("Started polling AWS Cloudwatch Logs group '%s' using poll interval of '%s'", c.logGroupName, c.pollInterval) +func (c *CloudwatchInput) pollEvents(ctx context.Context, logGroupName string) { + c.Infof("Started polling AWS Cloudwatch Logs group '%s' using poll interval of '%s'", logGroupName, c.pollInterval) defer c.wg.Done() - // Create session to use when connecting to AWS Cloudwatch Logs - svc, sessionErr := c.sessionBuilder() - if sessionErr != nil { - c.Errorf("failed to create new session: %s", sessionErr) - } - // Get events immediately when operator starts then use poll_interval duration. - err := c.getEvents(ctx, svc) + err := c.getEvents(ctx, logGroupName) if err != nil { c.Errorf("failed to get events: %s", err) } @@ -165,7 +183,7 @@ func (c *CloudwatchInput) pollEvents(ctx context.Context) { case <-ctx.Done(): return case <-time.After(c.pollInterval.Duration): - err := c.getEvents(ctx, svc) + err := c.getEvents(ctx, logGroupName) if err != nil { c.Errorf("failed to get events: %s", err) } @@ -173,6 +191,27 @@ func (c *CloudwatchInput) pollEvents(ctx context.Context) { } } +// configureSession configures access to AWS +func (c *CloudwatchInput) configureSession() error { + var lastError error + + sum := 0 + for i := 1; i < 5; i++ { + s, err := c.sessionBuilder() + if err != nil { + c.Errorf("failed to configure AWS session: %s", err) + lastError = err + sum += i + continue + } + + c.session = s + return nil + } + + return lastError +} + // sessionBuilder builds a session for AWS Cloudwatch Logs func (c *CloudwatchInput) sessionBuilder() (*cloudwatchlogs.CloudWatchLogs, error) { region := aws.String(c.region) @@ -193,27 +232,27 @@ func (c *CloudwatchInput) sessionBuilder() (*cloudwatchlogs.CloudWatchLogs, erro } // getEvents uses a session to get events from AWS Cloudwatch Logs -func (c *CloudwatchInput) getEvents(ctx context.Context, svc *cloudwatchlogs.CloudWatchLogs) error { +func (c *CloudwatchInput) getEvents(ctx context.Context, logGroupName string) error { nextToken := "" - st, err := c.persist.Read(c.logGroupName) + st, err := c.persist.Read(logGroupName) if err != nil { c.Errorf("failed to get persist: %s", err) } - c.Debugf("Read start time %d from database", st) + c.Debugf("Read start time %d for log group %s from database", st, logGroupName) c.startTime = st if c.startAtEnd && c.startTime == 0 { c.startTime = currentTimeInUnixMilliseconds(time.Now()) c.Debugf("Setting start time to current time: %d", c.startTime) } - c.Debugf("Getting events from AWS Cloudwatch Logs groupname '%s' using start time of %s", c.logGroupName, fromUnixMilli(c.startTime)) + c.Debugf("Getting events from AWS Cloudwatch Logs groupname '%s' using start time of %s", logGroupName, fromUnixMilli(c.startTime)) for { select { case <-ctx.Done(): return nil default: - input := c.filterLogEventsInputBuilder(nextToken) + input := c.filterLogEventsInputBuilder(nextToken, logGroupName) - resp, err := svc.FilterLogEvents(&input) + resp, err := c.session.FilterLogEvents(&input) if err != nil { return err } @@ -222,7 +261,7 @@ func (c *CloudwatchInput) getEvents(ctx context.Context, svc *cloudwatchlogs.Clo break } - c.handleEvents(ctx, resp.Events) + c.handleEvents(ctx, resp.Events, logGroupName) if resp.NextToken == nil { break @@ -234,8 +273,8 @@ func (c *CloudwatchInput) getEvents(ctx context.Context, svc *cloudwatchlogs.Clo // filterLogEventsInputBuilder builds AWS Cloudwatch Logs Filter Log Events Input based on provided values // and returns completed input. -func (c *CloudwatchInput) filterLogEventsInputBuilder(nextToken string) cloudwatchlogs.FilterLogEventsInput { - logGroupNamePtr := aws.String(c.logGroupName) +func (c *CloudwatchInput) filterLogEventsInputBuilder(nextToken string, logGroupName string) cloudwatchlogs.FilterLogEventsInput { + logGroupNamePtr := aws.String(logGroupName) limit := aws.Int64(c.eventLimit) startTime := aws.Int64(c.startTime) @@ -301,7 +340,7 @@ func (c *CloudwatchInput) filterLogEventsInputBuilder(nextToken string) cloudwat } // handleEvent is the handler for a AWS Cloudwatch Logs Filtered Event. -func (c *CloudwatchInput) handleEvent(ctx context.Context, event *cloudwatchlogs.FilteredLogEvent) { +func (c *CloudwatchInput) handleEvent(ctx context.Context, event *cloudwatchlogs.FilteredLogEvent, logGroupName string) { e := map[string]interface{}{ "message": event.Message, "ingestion_time": event.IngestionTime, @@ -311,7 +350,7 @@ func (c *CloudwatchInput) handleEvent(ctx context.Context, event *cloudwatchlogs c.Errorf("Failed to create new entry from record: %s", err) } - entry.AddResourceKey("log_group", c.logGroupName) + entry.AddResourceKey("log_group", logGroupName) entry.AddResourceKey("region", c.region) entry.AddResourceKey("log_stream", *event.LogStreamName) entry.AddResourceKey("event_id", *event.EventId) @@ -324,19 +363,74 @@ func (c *CloudwatchInput) handleEvent(ctx context.Context, event *cloudwatchlogs if *event.IngestionTime > c.startTime { c.startTime = *event.IngestionTime c.Debugf("Writing start time %d to database", *event.IngestionTime) - c.persist.Write(c.logGroupName, c.startTime) + c.persist.Write(logGroupName, c.startTime) } } -func (c *CloudwatchInput) handleEvents(ctx context.Context, events []*cloudwatchlogs.FilteredLogEvent) { +func (c *CloudwatchInput) handleEvents(ctx context.Context, events []*cloudwatchlogs.FilteredLogEvent, logGroupName string) { for _, event := range events { - c.handleEvent(ctx, event) + c.handleEvent(ctx, event, logGroupName) } if err := c.persist.DB.Sync(); err != nil { c.Errorf("Failed to sync offset database: %s", err) } } +// buildLogGroupList merges log_group_name and log_group_prefix into log_groups +func (c *CloudwatchInput) buildLogGroupList() { + if c.logGroupName != "" { + found := false + for _, group := range c.logGroups { + if c.logGroupName == group { + found = true + break + } + } + if !found { + c.logGroups = append(c.logGroups, c.logGroupName) + } + } + + if c.logGroupPrefix != "" { + c.detectLogGroups() + } +} + +// detectLogGroups detects log groups from a prefix +func (c *CloudwatchInput) detectLogGroups() { + limit := int64(50) // Max allowed by aws + req := &cloudwatchlogs.DescribeLogGroupsInput{ + Limit: &limit, + LogGroupNamePrefix: &c.logGroupPrefix, + } + + resp, err := c.session.DescribeLogGroups(req) + if err != nil { + c.Errorf("failed to detect log group names: %s", err) + return + } + + for _, logGroup := range resp.LogGroups { + g := *logGroup.LogGroupName + + found := false + for _, logGroup := range c.logGroups { + if logGroup == g { + found = true + break + } + } + if !found { + c.Debugf("detected log group '%s'", g) + c.logGroups = append(c.logGroups, g) + } + } + + if resp.NextToken != nil { + req.NextToken = resp.NextToken + } +} + // Returns time.Now() as Unix Time in Milliseconds func currentTimeInUnixMilliseconds(timeNow time.Time) int64 { return timeNow.UnixNano() / int64(time.Millisecond) diff --git a/operator/builtin/input/aws/cloudwatch/cloudwatch_persist.go b/operator/builtin/input/aws/cloudwatch/cloudwatch_persist.go index cc251e302..8d013007e 100644 --- a/operator/builtin/input/aws/cloudwatch/cloudwatch_persist.go +++ b/operator/builtin/input/aws/cloudwatch/cloudwatch_persist.go @@ -16,7 +16,10 @@ func (p *Persister) Read(key string) (int64, error) { var startTime int64 buffer := bytes.NewBuffer(p.DB.Get(key)) err := binary.Read(buffer, binary.BigEndian, &startTime) - return startTime, err + if err != nil && err.Error() != "EOF" { + return 0, err + } + return startTime, nil } // Helper function to set persisted data diff --git a/operator/builtin/input/aws/cloudwatch/cloudwatch_test.go b/operator/builtin/input/aws/cloudwatch/cloudwatch_test.go index 7751cd739..eb02dd96b 100644 --- a/operator/builtin/input/aws/cloudwatch/cloudwatch_test.go +++ b/operator/builtin/input/aws/cloudwatch/cloudwatch_test.go @@ -12,7 +12,6 @@ import ( func TestBuild(t *testing.T) { basicConfig := func() *CloudwatchInputConfig { cfg := NewCloudwatchConfig("test_operator_id") - cfg.LogGroupName = "test" cfg.Region = "test" return cfg } @@ -28,6 +27,10 @@ func TestBuild(t *testing.T) { "default", func() *CloudwatchInputConfig { cfg := basicConfig() + cfg.LogGroups = []string{ + "test", + "test-2", + } return cfg }(), false, @@ -36,6 +39,10 @@ func TestBuild(t *testing.T) { "log-stream-name-prefix", func() *CloudwatchInputConfig { cfg := basicConfig() + cfg.LogGroups = []string{ + "test", + "test-2", + } cfg.LogStreamNamePrefix = "" return cfg }(), @@ -45,6 +52,10 @@ func TestBuild(t *testing.T) { "event-limit", func() *CloudwatchInputConfig { cfg := basicConfig() + cfg.LogGroups = []string{ + "test", + "test-2", + } cfg.EventLimit = 5000 return cfg }(), @@ -54,6 +65,10 @@ func TestBuild(t *testing.T) { "poll-interval", func() *CloudwatchInputConfig { cfg := basicConfig() + cfg.LogGroups = []string{ + "test", + "test-2", + } cfg.PollInterval = helper.Duration{Duration: 15 * time.Second} return cfg }(), @@ -63,6 +78,10 @@ func TestBuild(t *testing.T) { "profile", func() *CloudwatchInputConfig { cfg := basicConfig() + cfg.LogGroups = []string{ + "test", + "test-2", + } cfg.Profile = "test" return cfg }(), @@ -72,6 +91,10 @@ func TestBuild(t *testing.T) { "log-stream-names", func() *CloudwatchInputConfig { cfg := basicConfig() + cfg.LogGroups = []string{ + "test", + "test-2", + } cfg.LogStreamNames = testStreams return cfg }(), @@ -81,6 +104,10 @@ func TestBuild(t *testing.T) { "startat-end", func() *CloudwatchInputConfig { cfg := basicConfig() + cfg.LogGroups = []string{ + "test", + "test-2", + } cfg.StartAt = "end" return cfg }(), @@ -90,6 +117,10 @@ func TestBuild(t *testing.T) { "logStreamNames and logStreamNamePrefix both parameters Error", func() *CloudwatchInputConfig { cfg := basicConfig() + cfg.LogGroups = []string{ + "test", + "test-2", + } cfg.LogStreamNames = testStreams cfg.LogStreamNamePrefix = "test" return cfg @@ -100,6 +131,10 @@ func TestBuild(t *testing.T) { "startat-beginning", func() *CloudwatchInputConfig { cfg := basicConfig() + cfg.LogGroups = []string{ + "test", + "test-2", + } cfg.StartAt = "beginning" cfg.LogStreamNamePrefix = "test" return cfg @@ -110,6 +145,10 @@ func TestBuild(t *testing.T) { "poll-interval-invalid", func() *CloudwatchInputConfig { cfg := basicConfig() + cfg.LogGroups = []string{ + "test", + "test-2", + } cfg.PollInterval = helper.Duration{Duration: time.Second * 0} return cfg }(), @@ -119,6 +158,10 @@ func TestBuild(t *testing.T) { "event-limit-invalid", func() *CloudwatchInputConfig { cfg := basicConfig() + cfg.LogGroups = []string{ + "test", + "test-2", + } cfg.EventLimit = 10001 return cfg }(), @@ -128,11 +171,95 @@ func TestBuild(t *testing.T) { "default-required-startat-invalid", func() *CloudwatchInputConfig { cfg := basicConfig() + cfg.LogGroups = []string{ + "test", + "test-2", + } cfg.StartAt = "invalid" return cfg }(), true, }, + { + "log-group-name", + func() *CloudwatchInputConfig { + cfg := basicConfig() + cfg.LogGroupName = "test" + return cfg + }(), + false, + }, + { + "log-groups", + func() *CloudwatchInputConfig { + cfg := basicConfig() + cfg.LogGroups = []string{ + "test", + "test-2", + } + return cfg + }(), + false, + }, + { + "log-groups-and-log-group-name", + func() *CloudwatchInputConfig { + cfg := basicConfig() + cfg.LogGroupName = "test" + cfg.LogGroups = []string{ + "test", + "test-2", + } + return cfg + }(), + false, + }, + { + "log-group-prefix", + func() *CloudwatchInputConfig { + cfg := basicConfig() + cfg.LogGroupPrefix = "/aws" + return cfg + }(), + false, + }, + { + "log-group-prefix-and-log-groups", + func() *CloudwatchInputConfig { + cfg := basicConfig() + cfg.LogGroupPrefix = "/aws" + cfg.LogGroups = []string{ + "test", + "test-2", + } + return cfg + }(), + false, + }, + { + "log-group-prefix-and-log-group-name", + func() *CloudwatchInputConfig { + cfg := basicConfig() + cfg.LogGroupPrefix = "/aws" + cfg.LogGroupName = "test" + return cfg + }(), + false, + }, + { + "log_group_prefix-log_group_name-log_groups", + func() *CloudwatchInputConfig { + cfg := basicConfig() + cfg.LogGroupPrefix = "/aws" + cfg.LogGroupName = "test" + cfg.LogGroups = []string{ + "test", + "aws", + } + return cfg + }(), + false, + }, } for _, tc := range cases {