Skip to content

Commit

Permalink
Cloudwatch multi group (#420)
Browse files Browse the repository at this point in the history
* check for EOF error when reading from offset database

* initial support for multiple log groups

* support log group prefix for dynamic log group detection

* configure aws session once

* allow all 3 input params: log_group_name, log_groups, log_group_prefix and merge them

* move log group merge logic to buildLogGroupList method

* handle session error
  • Loading branch information
Joseph Sirianni authored Sep 14, 2021
1 parent 77e05b9 commit 7a6a884
Show file tree
Hide file tree
Showing 3 changed files with 257 additions and 33 deletions.
156 changes: 125 additions & 31 deletions operator/builtin/input/aws/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,17 @@ func NewCloudwatchConfig(operatorID string) *CloudwatchInputConfig {
type CloudwatchInputConfig struct {
helper.InputConfig `yaml:",inline"`

// required
// LogGroupName is deprecated but still supported for compatibility with older configurations
LogGroupName string `json:"log_group_name,omitempty" yaml:"log_group_name,omitempty"`
Region string `json:"region,omitempty" yaml:"region,omitempty"`

// LogGroups is a list of log groups
LogGroups []string `json:"log_groups,omitempty" yaml:"log_groups,omitempty"`

// LogGroupPrefix is used to append to LogGroups and can be used in conjunction with LogGroups
LogGroupPrefix string `json:"log_group_prefix,omitempty" yaml:"log_group_prefix,omitempty"`

// Region is the AWS region to target
Region string `json:"region,omitempty" yaml:"region,omitempty"`

// optional
LogStreamNamePrefix string `json:"log_stream_name_prefix,omitempty" yaml:"log_stream_name_prefix,omitempty"`
Expand All @@ -56,12 +64,13 @@ func (c *CloudwatchInputConfig) Build(buildContext operator.BuildContext) ([]ope
return nil, err
}

if c.LogGroupName == "" {
return nil, fmt.Errorf("missing required %s parameter 'log_group_name'", operatorName)
if c.LogGroupName == "" && len(c.LogGroups) == 0 && c.LogGroupPrefix == "" {
// LogGroupName is depricated, do not log it
return nil, fmt.Errorf("missing required %s parameter 'log_groups', or log_group_prefix", operatorName)
}

if len(c.LogStreamNames) > 0 && c.LogStreamNamePrefix != "" {
return nil, fmt.Errorf("invalid configuration. Cannot use both 'log_stream_names' and 'log_stream_name_prefix' %s parameters", operatorName)
return nil, fmt.Errorf("invalid configuration, cannot use both 'log_stream_names' and 'log_stream_name_prefix' %s parameters", operatorName)
}

if c.Region == "" {
Expand Down Expand Up @@ -89,6 +98,8 @@ func (c *CloudwatchInputConfig) Build(buildContext operator.BuildContext) ([]ope
cloudwatchInput := &CloudwatchInput{
InputOperator: inputOperator,
logGroupName: c.LogGroupName,
logGroups: c.LogGroups,
logGroupPrefix: c.LogGroupPrefix,
logStreamNames: c.LogStreamNames,
logStreamNamePrefix: c.LogStreamNamePrefix,
region: c.Region,
Expand All @@ -110,6 +121,8 @@ type CloudwatchInput struct {
pollInterval helper.Duration

logGroupName string
logGroups []string
logGroupPrefix string
logStreamNames []*string
logStreamNamePrefix string
region string
Expand All @@ -119,18 +132,29 @@ type CloudwatchInput struct {
startTime int64
persist Persister
wg sync.WaitGroup

session *cloudwatchlogs.CloudWatchLogs
}

// Start will start generating log entries.
func (c *CloudwatchInput) Start() error {
ctx, cancel := context.WithCancel(context.Background())
c.cancel = cancel

if err := c.configureSession(); err != nil {
return fmt.Errorf("failed to configure AWS SDK: %s", err)
}

c.buildLogGroupList()

if err := c.persist.DB.Load(); err != nil {
return err
}
c.wg.Add(1)
go c.pollEvents(ctx)

for _, logGroup := range c.logGroups {
c.wg.Add(1)
go c.pollEvents(ctx, logGroup)
}
return nil
}

Expand All @@ -143,18 +167,12 @@ func (c *CloudwatchInput) Stop() error {
}

// pollEvents gets events from AWS Cloudwatch Logs every poll interval.
func (c *CloudwatchInput) pollEvents(ctx context.Context) {
c.Infof("Started polling AWS Cloudwatch Logs group '%s' using poll interval of '%s'", c.logGroupName, c.pollInterval)
func (c *CloudwatchInput) pollEvents(ctx context.Context, logGroupName string) {
c.Infof("Started polling AWS Cloudwatch Logs group '%s' using poll interval of '%s'", logGroupName, c.pollInterval)
defer c.wg.Done()

// Create session to use when connecting to AWS Cloudwatch Logs
svc, sessionErr := c.sessionBuilder()
if sessionErr != nil {
c.Errorf("failed to create new session: %s", sessionErr)
}

// Get events immediately when operator starts then use poll_interval duration.
err := c.getEvents(ctx, svc)
err := c.getEvents(ctx, logGroupName)
if err != nil {
c.Errorf("failed to get events: %s", err)
}
Expand All @@ -165,14 +183,35 @@ func (c *CloudwatchInput) pollEvents(ctx context.Context) {
case <-ctx.Done():
return
case <-time.After(c.pollInterval.Duration):
err := c.getEvents(ctx, svc)
err := c.getEvents(ctx, logGroupName)
if err != nil {
c.Errorf("failed to get events: %s", err)
}
}
}
}

// configureSession configures access to AWS
func (c *CloudwatchInput) configureSession() error {
var lastError error

sum := 0
for i := 1; i < 5; i++ {
s, err := c.sessionBuilder()
if err != nil {
c.Errorf("failed to configure AWS session: %s", err)
lastError = err
sum += i
continue
}

c.session = s
return nil
}

return lastError
}

// sessionBuilder builds a session for AWS Cloudwatch Logs
func (c *CloudwatchInput) sessionBuilder() (*cloudwatchlogs.CloudWatchLogs, error) {
region := aws.String(c.region)
Expand All @@ -193,27 +232,27 @@ func (c *CloudwatchInput) sessionBuilder() (*cloudwatchlogs.CloudWatchLogs, erro
}

// getEvents uses a session to get events from AWS Cloudwatch Logs
func (c *CloudwatchInput) getEvents(ctx context.Context, svc *cloudwatchlogs.CloudWatchLogs) error {
func (c *CloudwatchInput) getEvents(ctx context.Context, logGroupName string) error {
nextToken := ""
st, err := c.persist.Read(c.logGroupName)
st, err := c.persist.Read(logGroupName)
if err != nil {
c.Errorf("failed to get persist: %s", err)
}
c.Debugf("Read start time %d from database", st)
c.Debugf("Read start time %d for log group %s from database", st, logGroupName)
c.startTime = st
if c.startAtEnd && c.startTime == 0 {
c.startTime = currentTimeInUnixMilliseconds(time.Now())
c.Debugf("Setting start time to current time: %d", c.startTime)
}
c.Debugf("Getting events from AWS Cloudwatch Logs groupname '%s' using start time of %s", c.logGroupName, fromUnixMilli(c.startTime))
c.Debugf("Getting events from AWS Cloudwatch Logs groupname '%s' using start time of %s", logGroupName, fromUnixMilli(c.startTime))
for {
select {
case <-ctx.Done():
return nil
default:
input := c.filterLogEventsInputBuilder(nextToken)
input := c.filterLogEventsInputBuilder(nextToken, logGroupName)

resp, err := svc.FilterLogEvents(&input)
resp, err := c.session.FilterLogEvents(&input)
if err != nil {
return err
}
Expand All @@ -222,7 +261,7 @@ func (c *CloudwatchInput) getEvents(ctx context.Context, svc *cloudwatchlogs.Clo
break
}

c.handleEvents(ctx, resp.Events)
c.handleEvents(ctx, resp.Events, logGroupName)

if resp.NextToken == nil {
break
Expand All @@ -234,8 +273,8 @@ func (c *CloudwatchInput) getEvents(ctx context.Context, svc *cloudwatchlogs.Clo

// filterLogEventsInputBuilder builds AWS Cloudwatch Logs Filter Log Events Input based on provided values
// and returns completed input.
func (c *CloudwatchInput) filterLogEventsInputBuilder(nextToken string) cloudwatchlogs.FilterLogEventsInput {
logGroupNamePtr := aws.String(c.logGroupName)
func (c *CloudwatchInput) filterLogEventsInputBuilder(nextToken string, logGroupName string) cloudwatchlogs.FilterLogEventsInput {
logGroupNamePtr := aws.String(logGroupName)
limit := aws.Int64(c.eventLimit)
startTime := aws.Int64(c.startTime)

Expand Down Expand Up @@ -301,7 +340,7 @@ func (c *CloudwatchInput) filterLogEventsInputBuilder(nextToken string) cloudwat
}

// handleEvent is the handler for a AWS Cloudwatch Logs Filtered Event.
func (c *CloudwatchInput) handleEvent(ctx context.Context, event *cloudwatchlogs.FilteredLogEvent) {
func (c *CloudwatchInput) handleEvent(ctx context.Context, event *cloudwatchlogs.FilteredLogEvent, logGroupName string) {
e := map[string]interface{}{
"message": event.Message,
"ingestion_time": event.IngestionTime,
Expand All @@ -311,7 +350,7 @@ func (c *CloudwatchInput) handleEvent(ctx context.Context, event *cloudwatchlogs
c.Errorf("Failed to create new entry from record: %s", err)
}

entry.AddResourceKey("log_group", c.logGroupName)
entry.AddResourceKey("log_group", logGroupName)
entry.AddResourceKey("region", c.region)
entry.AddResourceKey("log_stream", *event.LogStreamName)
entry.AddResourceKey("event_id", *event.EventId)
Expand All @@ -324,19 +363,74 @@ func (c *CloudwatchInput) handleEvent(ctx context.Context, event *cloudwatchlogs
if *event.IngestionTime > c.startTime {
c.startTime = *event.IngestionTime
c.Debugf("Writing start time %d to database", *event.IngestionTime)
c.persist.Write(c.logGroupName, c.startTime)
c.persist.Write(logGroupName, c.startTime)
}
}

func (c *CloudwatchInput) handleEvents(ctx context.Context, events []*cloudwatchlogs.FilteredLogEvent) {
func (c *CloudwatchInput) handleEvents(ctx context.Context, events []*cloudwatchlogs.FilteredLogEvent, logGroupName string) {
for _, event := range events {
c.handleEvent(ctx, event)
c.handleEvent(ctx, event, logGroupName)
}
if err := c.persist.DB.Sync(); err != nil {
c.Errorf("Failed to sync offset database: %s", err)
}
}

// buildLogGroupList merges log_group_name and log_group_prefix into log_groups
func (c *CloudwatchInput) buildLogGroupList() {
if c.logGroupName != "" {
found := false
for _, group := range c.logGroups {
if c.logGroupName == group {
found = true
break
}
}
if !found {
c.logGroups = append(c.logGroups, c.logGroupName)
}
}

if c.logGroupPrefix != "" {
c.detectLogGroups()
}
}

// detectLogGroups detects log groups from a prefix
func (c *CloudwatchInput) detectLogGroups() {
limit := int64(50) // Max allowed by aws
req := &cloudwatchlogs.DescribeLogGroupsInput{
Limit: &limit,
LogGroupNamePrefix: &c.logGroupPrefix,
}

resp, err := c.session.DescribeLogGroups(req)
if err != nil {
c.Errorf("failed to detect log group names: %s", err)
return
}

for _, logGroup := range resp.LogGroups {
g := *logGroup.LogGroupName

found := false
for _, logGroup := range c.logGroups {
if logGroup == g {
found = true
break
}
}
if !found {
c.Debugf("detected log group '%s'", g)
c.logGroups = append(c.logGroups, g)
}
}

if resp.NextToken != nil {
req.NextToken = resp.NextToken
}
}

// Returns time.Now() as Unix Time in Milliseconds
func currentTimeInUnixMilliseconds(timeNow time.Time) int64 {
return timeNow.UnixNano() / int64(time.Millisecond)
Expand Down
5 changes: 4 additions & 1 deletion operator/builtin/input/aws/cloudwatch/cloudwatch_persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ func (p *Persister) Read(key string) (int64, error) {
var startTime int64
buffer := bytes.NewBuffer(p.DB.Get(key))
err := binary.Read(buffer, binary.BigEndian, &startTime)
return startTime, err
if err != nil && err.Error() != "EOF" {
return 0, err
}
return startTime, nil
}

// Helper function to set persisted data
Expand Down
Loading

0 comments on commit 7a6a884

Please sign in to comment.