Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
jsirianni committed Sep 16, 2021
1 parent ccc90ae commit a77e3af
Show file tree
Hide file tree
Showing 3 changed files with 260 additions and 36 deletions.
156 changes: 125 additions & 31 deletions pkg/receiver/operators/input/aws/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,17 @@ func NewCloudwatchConfig(operatorID string) *CloudwatchInputConfig {
type CloudwatchInputConfig struct {
helper.InputConfig `yaml:",inline"`

// required
// LogGroupName is deprecated but still supported for compatibility with older configurations
LogGroupName string `json:"log_group_name,omitempty" yaml:"log_group_name,omitempty"`
Region string `json:"region,omitempty" yaml:"region,omitempty"`

// LogGroups is a list of log groups
LogGroups []string `json:"log_groups,omitempty" yaml:"log_groups,omitempty"`

// LogGroupPrefix is used to append to LogGroups and can be used in conjunction with LogGroups
LogGroupPrefix string `json:"log_group_prefix,omitempty" yaml:"log_group_prefix,omitempty"`

// Region is the AWS region to target
Region string `json:"region,omitempty" yaml:"region,omitempty"`

// optional
LogStreamNamePrefix string `json:"log_stream_name_prefix,omitempty" yaml:"log_stream_name_prefix,omitempty"`
Expand All @@ -56,12 +64,13 @@ func (c *CloudwatchInputConfig) Build(buildContext operator.BuildContext) ([]ope
return nil, err
}

if c.LogGroupName == "" {
return nil, fmt.Errorf("missing required %s parameter 'log_group_name'", operatorName)
if c.LogGroupName == "" && len(c.LogGroups) == 0 && c.LogGroupPrefix == "" {
// LogGroupName is depricated, do not log it
return nil, fmt.Errorf("missing required %s parameter 'log_groups', or log_group_prefix", operatorName)
}

if len(c.LogStreamNames) > 0 && c.LogStreamNamePrefix != "" {
return nil, fmt.Errorf("invalid configuration. Cannot use both 'log_stream_names' and 'log_stream_name_prefix' %s parameters", operatorName)
return nil, fmt.Errorf("invalid configuration. cannot use both 'log_stream_names' and 'log_stream_name_prefix' %s parameters", operatorName)
}

if c.Region == "" {
Expand Down Expand Up @@ -89,6 +98,8 @@ func (c *CloudwatchInputConfig) Build(buildContext operator.BuildContext) ([]ope
cloudwatchInput := &CloudwatchInput{
InputOperator: inputOperator,
logGroupName: c.LogGroupName,
logGroups: c.LogGroups,
logGroupPrefix: c.LogGroupPrefix,
logStreamNames: c.LogStreamNames,
logStreamNamePrefix: c.LogStreamNamePrefix,
region: c.Region,
Expand All @@ -107,6 +118,8 @@ type CloudwatchInput struct {
pollInterval helper.Duration

logGroupName string
logGroups []string
logGroupPrefix string
logStreamNames []*string
logStreamNamePrefix string
region string
Expand All @@ -116,15 +129,26 @@ type CloudwatchInput struct {
startTime int64
persist Persister
wg sync.WaitGroup

session *cloudwatchlogs.CloudWatchLogs
}

// Start will start generating log entries.
func (c *CloudwatchInput) Start(persister operator.Persister) error {
c.persist = Persister{DB: persister}
ctx, cancel := context.WithCancel(context.Background())
c.cancel = cancel
c.wg.Add(1)
go c.pollEvents(ctx)

if err := c.configureSession(); err != nil {
return fmt.Errorf("failed to configure AWS SDK: %s", err)
}

c.buildLogGroupList()

for _, logGroup := range c.logGroups {
c.wg.Add(1)
go c.pollEvents(ctx, logGroup)
}
return nil
}

Expand All @@ -137,18 +161,12 @@ func (c *CloudwatchInput) Stop() error {
}

// pollEvents gets events from AWS Cloudwatch Logs every poll interval.
func (c *CloudwatchInput) pollEvents(ctx context.Context) {
c.Infof("Started polling AWS Cloudwatch Logs group '%s' using poll interval of '%s'", c.logGroupName, c.pollInterval)
func (c *CloudwatchInput) pollEvents(ctx context.Context, logGroupName string) {
c.Infof("Started polling AWS Cloudwatch Logs group '%s' using poll interval of '%s'", logGroupName, c.pollInterval)
defer c.wg.Done()

// Create session to use when connecting to AWS Cloudwatch Logs
svc, sessionErr := c.sessionBuilder()
if sessionErr != nil {
c.Errorf("failed to create new session: %s", sessionErr)
}

// Get events immediately when operator starts then use poll_interval duration.
err := c.getEvents(ctx, svc)
err := c.getEvents(ctx, logGroupName)
if err != nil {
c.Errorf("failed to get events: %s", err)
}
Expand All @@ -159,14 +177,35 @@ func (c *CloudwatchInput) pollEvents(ctx context.Context) {
case <-ctx.Done():
return
case <-time.After(c.pollInterval.Duration):
err := c.getEvents(ctx, svc)
err := c.getEvents(ctx, logGroupName)
if err != nil {
c.Errorf("failed to get events: %s", err)
}
}
}
}

// configureSession configures access to AWS
func (c *CloudwatchInput) configureSession() error {
var lastError error

sum := 0
for i := 1; i < 5; i++ {
s, err := c.sessionBuilder()
if err != nil {
c.Errorf("failed to configure AWS session: %s", err)
lastError = err
sum += i
continue
}

c.session = s
return nil
}

return lastError
}

// sessionBuilder builds a session for AWS Cloudwatch Logs
func (c *CloudwatchInput) sessionBuilder() (*cloudwatchlogs.CloudWatchLogs, error) {
region := aws.String(c.region)
Expand All @@ -187,27 +226,27 @@ func (c *CloudwatchInput) sessionBuilder() (*cloudwatchlogs.CloudWatchLogs, erro
}

// getEvents uses a session to get events from AWS Cloudwatch Logs
func (c *CloudwatchInput) getEvents(ctx context.Context, svc *cloudwatchlogs.CloudWatchLogs) error {
func (c *CloudwatchInput) getEvents(ctx context.Context, logGroupName string) error {
nextToken := ""
st, err := c.persist.Read(ctx, c.ID())
st, err := c.persist.Read(ctx, fmt.Sprintf("%s-%s", c.ID(), logGroupName))
if err != nil {
c.Errorf("failed to get persist: %s", err)
}
c.Debugf("Read start time %d from database", st)
c.Debugf("Read start time %d for log group %s from database", st, logGroupName)
c.startTime = st
if c.startAtEnd && c.startTime == 0 {
c.startTime = currentTimeInUnixMilliseconds(time.Now())
c.Debugf("Setting start time to current time: %d", c.startTime)
}
c.Debugf("Getting events from AWS Cloudwatch Logs groupname '%s' using start time of %s", c.logGroupName, fromUnixMilli(c.startTime))
c.Debugf("Getting events from AWS Cloudwatch Logs groupname '%s' using start time of %s", logGroupName, fromUnixMilli(c.startTime))
for {
select {
case <-ctx.Done():
return nil
default:
input := c.filterLogEventsInputBuilder(nextToken)
input := c.filterLogEventsInputBuilder(nextToken, logGroupName)

resp, err := svc.FilterLogEvents(&input)
resp, err := c.session.FilterLogEvents(&input)
if err != nil {
return err
}
Expand All @@ -216,7 +255,7 @@ func (c *CloudwatchInput) getEvents(ctx context.Context, svc *cloudwatchlogs.Clo
break
}

c.handleEvents(ctx, resp.Events)
c.handleEvents(ctx, resp.Events, logGroupName)

if resp.NextToken == nil {
break
Expand All @@ -228,8 +267,8 @@ func (c *CloudwatchInput) getEvents(ctx context.Context, svc *cloudwatchlogs.Clo

// filterLogEventsInputBuilder builds AWS Cloudwatch Logs Filter Log Events Input based on provided values
// and returns completed input.
func (c *CloudwatchInput) filterLogEventsInputBuilder(nextToken string) cloudwatchlogs.FilterLogEventsInput {
logGroupNamePtr := aws.String(c.logGroupName)
func (c *CloudwatchInput) filterLogEventsInputBuilder(nextToken string, logGroupName string) cloudwatchlogs.FilterLogEventsInput {
logGroupNamePtr := aws.String(logGroupName)
limit := aws.Int64(c.eventLimit)
startTime := aws.Int64(c.startTime)

Expand Down Expand Up @@ -295,7 +334,7 @@ func (c *CloudwatchInput) filterLogEventsInputBuilder(nextToken string) cloudwat
}

// handleEvent is the handler for a AWS Cloudwatch Logs Filtered Event.
func (c *CloudwatchInput) handleEvent(ctx context.Context, event *cloudwatchlogs.FilteredLogEvent) {
func (c *CloudwatchInput) handleEvent(ctx context.Context, event *cloudwatchlogs.FilteredLogEvent, logGroupName string) {
e := map[string]interface{}{}
if event.Message != nil {
e["message"] = *event.Message
Expand All @@ -308,7 +347,7 @@ func (c *CloudwatchInput) handleEvent(ctx context.Context, event *cloudwatchlogs
c.Errorf("Failed to create new entry from record: %s", err)
}

entry.AddResourceKey("log_group", c.logGroupName)
entry.AddResourceKey("log_group", logGroupName)
entry.AddResourceKey("region", c.region)
entry.AddResourceKey("log_stream", *event.LogStreamName)
entry.AddResourceKey("event_id", *event.EventId)
Expand All @@ -321,16 +360,71 @@ func (c *CloudwatchInput) handleEvent(ctx context.Context, event *cloudwatchlogs
if *event.IngestionTime > c.startTime {
c.startTime = *event.IngestionTime
c.Debugf("Writing start time %d to database", *event.IngestionTime)
err := c.persist.Write(ctx, c.logGroupName, c.startTime)
err := c.persist.Write(ctx, logGroupName, c.startTime)
if err != nil {
c.Errorf("Failed to update persistent storage: %w", err)
}
}
}

func (c *CloudwatchInput) handleEvents(ctx context.Context, events []*cloudwatchlogs.FilteredLogEvent) {
func (c *CloudwatchInput) handleEvents(ctx context.Context, events []*cloudwatchlogs.FilteredLogEvent, logGroupName string) {
for _, event := range events {
c.handleEvent(ctx, event)
c.handleEvent(ctx, event, logGroupName)
}
}

// buildLogGroupList merges log_group_name and log_group_prefix into log_groups
func (c *CloudwatchInput) buildLogGroupList() {
if c.logGroupName != "" {
found := false
for _, group := range c.logGroups {
if c.logGroupName == group {
found = true
break
}
}
if !found {
c.logGroups = append(c.logGroups, c.logGroupName)
}
}

if c.logGroupPrefix != "" {
c.detectLogGroups()
}
}

// detectLogGroups detects log groups from a prefix
func (c *CloudwatchInput) detectLogGroups() {
limit := int64(50) // Max allowed by aws
req := &cloudwatchlogs.DescribeLogGroupsInput{
Limit: &limit,
LogGroupNamePrefix: &c.logGroupPrefix,
}

resp, err := c.session.DescribeLogGroups(req)
if err != nil {
c.Errorf("failed to detect log group names: %s", err)
return
}

for _, logGroup := range resp.LogGroups {
g := *logGroup.LogGroupName

found := false
for _, logGroup := range c.logGroups {
if logGroup == g {
found = true
break
}
}
if !found {
c.Debugf("detected log group '%s'", g)
c.logGroups = append(c.logGroups, g)
}
}

if resp.NextToken != nil {
req.NextToken = resp.NextToken
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ func (p *Persister) Read(ctx context.Context, key string) (int64, error) {
buffer := bytes.NewBuffer(startTimeBytes)
var startTime int64
err = binary.Read(buffer, binary.BigEndian, &startTime)
if err != nil && err.Error() != "EOF" {
return 0, err
}
return startTime, err
}

Expand Down
Loading

0 comments on commit a77e3af

Please sign in to comment.