diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index ce95bb8a7529..132c7878a25f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -81,6 +81,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix parsing of Elasticsearch node name by `elasticsearch/slowlog` fileset. {pull}14547[14547] - threatintel module: Changed the type of `threatintel.indicator.first_seen` from `keyword` to `date`. {pull}26765[26765] - Remove all alias fields pointing to ECS fields from modules. This affects the Suricata and Traefik modules. {issue}10535[10535] {pull}26627[26627] +- Add option for S3 input to work without SQS notification {issue}18205[18205] {pull}27332[27332] *Heartbeat* - Remove long deprecated `watch_poll` functionality. {pull}27166[27166] diff --git a/filebeat/docs/modules/aws.asciidoc b/filebeat/docs/modules/aws.asciidoc index bb8269933a4c..78cdc5c961cc 100644 --- a/filebeat/docs/modules/aws.asciidoc +++ b/filebeat/docs/modules/aws.asciidoc @@ -15,9 +15,14 @@ This file is generated! See scripts/docs_collector.py beta[] This is a module for aws logs. It uses filebeat s3 input to get log files from -AWS S3 buckets with SQS notification. This module supports reading s3 server -access logs with `s3access` fileset, ELB access logs with `elb` fileset, VPC -flow logs with `vpcflow` fileset, and CloudTrail logs with `cloudtrail` fileset. +AWS S3 buckets with SQS notification or directly polling list of S3 objects in an S3 bucket. +The use of SQS notification is preferred: polling list of S3 objects is expensive +in terms of performance and costs, and cannot scale horizontally without ingestion duplication, +and should be preferably used only when no SQS notification can be attached to the S3 buckets. + +This module supports reading S3 server access logs with `s3access` fileset, +ELB access logs with `elb` fileset, VPC flow logs with `vpcflow` fileset, +and CloudTrail logs with `cloudtrail` fileset. Access logs contain detailed information about the requests made to these services. VPC flow logs captures information about the IP traffic going to and @@ -44,6 +49,9 @@ Example config: cloudtrail: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue + #var.bucket: 'arn:aws:s3:::mybucket' + #var.bucket_list_interval: 300s + #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials #var.credential_profile_name: fb-aws #var.access_key_id: access_key_id @@ -58,6 +66,9 @@ Example config: cloudwatch: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue + #var.bucket: 'arn:aws:s3:::mybucket' + #var.bucket_list_interval: 300s + #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials #var.credential_profile_name: fb-aws #var.access_key_id: access_key_id @@ -72,6 +83,9 @@ Example config: ec2: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue + #var.bucket: 'arn:aws:s3:::mybucket' + #var.bucket_list_interval: 300s + #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials #var.credential_profile_name: fb-aws #var.access_key_id: access_key_id @@ -86,6 +100,9 @@ Example config: elb: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue + #var.bucket: 'arn:aws:s3:::mybucket' + #var.bucket_list_interval: 300s + #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials #var.credential_profile_name: fb-aws #var.access_key_id: access_key_id @@ -100,6 +117,9 @@ Example config: s3access: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue + #var.bucket: 'arn:aws:s3:::mybucket' + #var.bucket_list_interval: 300s + #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials #var.credential_profile_name: fb-aws #var.access_key_id: access_key_id @@ -114,6 +134,9 @@ Example config: vpcflow: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue + #var.bucket: 'arn:aws:s3:::mybucket' + #var.bucket_list_interval: 300s + #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials #var.credential_profile_name: fb-aws #var.access_key_id: access_key_id @@ -128,7 +151,7 @@ Example config: *`var.queue_url`*:: -(Required) AWS SQS queue url. +AWS SQS queue url (Required when `var.bucket` is not set). *`var.visibility_timeout`*:: @@ -139,6 +162,19 @@ Default to be 300 seconds. Maximum duration before AWS API request will be interrupted. Default to be 120 seconds. +*`var.bucket`*:: + +AWS S3 bucket ARN (Required when `var.queue_url` is not set). + +*`var.number_of_workers`*:: + +Number of workers that will process the S3 objects listed (Required when `var.bucket` is set). +Use to vertically scale the input. + +*`var.bucket_list_interval`*:: + +Wait interval between completion of a list request to the S3 bucket and beginning of the next one. Default to be 120 seconds. + *`var.endpoint`*:: Custom endpoint used to access AWS APIs. diff --git a/libbeat/publisher/testing/testing.go b/libbeat/publisher/testing/testing.go index 5b5e592d69ef..0c64e4601d5e 100644 --- a/libbeat/publisher/testing/testing.go +++ b/libbeat/publisher/testing/testing.go @@ -28,8 +28,9 @@ type TestPublisher struct { // given channel only. type ChanClient struct { - done chan struct{} - Channel chan beat.Event + done chan struct{} + Channel chan beat.Event + publishCallback func(event beat.Event) } func PublisherWithClient(client beat.Client) beat.Pipeline { @@ -44,6 +45,13 @@ func (pub *TestPublisher) ConnectWith(_ beat.ClientConfig) (beat.Client, error) return pub.client, nil } +func NewChanClientWithCallback(bufSize int, callback func(event beat.Event)) *ChanClient { + chanClient := NewChanClientWith(make(chan beat.Event, bufSize)) + chanClient.publishCallback = callback + + return chanClient +} + func NewChanClient(bufSize int) *ChanClient { return NewChanClientWith(make(chan beat.Event, bufSize)) } @@ -70,6 +78,10 @@ func (c *ChanClient) Publish(event beat.Event) { select { case <-c.done: case c.Channel <- event: + if c.publishCallback != nil { + c.publishCallback(event) + <-c.Channel + } } } diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index a302d0366b4a..647c3269c6bc 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -12,12 +12,19 @@ ++++ Use the `aws-s3` input to retrieve logs from S3 objects that are pointed to by -S3 notification events read from an SQS queue. This input can, for example, be +S3 notification events read from an SQS queue or directly polling list of S3 objects in an S3 bucket. +The use of SQS notification is preferred: polling list of S3 objects is expensive +in terms of performance and costs and should be preferably used only when no SQS +notification can be attached to the S3 buckets. This input can, for example, be used to receive S3 access logs to monitor detailed records for the requests that are made to a bucket. -This input depends on S3 notifications delivered to an SQS queue for -`s3:ObjectCreated:*` events. You must create an SQS queue and configure S3 +SQS notification method is enabled setting `queue_url` configuration value. +S3 bucket list polling method is enabled setting `s3_bucket` configuration value. +Both value cannot be set at the same time, at least one of the two value must be set. + +When using the SQS notification method this input depends on S3 notifications delivered +to an SQS queue for `s3:ObjectCreated:*` events. You must create an SQS queue and configure S3 to publish events to the queue. When processing a S3 object which pointed by a SQS message, if half of the set @@ -36,6 +43,24 @@ be stopped and the SQS message will be returned back to the queue. expand_event_list_from_field: Records ---- + +When using the direct polling list of S3 objects in an S3 buckets, +a number of workers that will process the S3 objects listed must be set +through the `number_of_workers` config. +Listing of the S3 bucket will be polled according the time interval defined by +`bucket_list_interval` config. Default value is 120secs. + +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: aws-s3 + s3_bucket: arn:aws:s3:::test-s3-bucket + number_of_workers: 5 + bucket_list_interval: 300s + credential_profile_name: elastic-beats + expand_event_list_from_field: Records +---- + The `aws-s3` input supports the following configuration options plus the <<{beatname_lc}-input-{type}-common-options>> described later. @@ -211,7 +236,7 @@ configuring multiline options. [float] ==== `queue_url` -URL of the AWS SQS queue that messages will be received from. Required. +URL of the AWS SQS queue that messages will be received from. (Required when `s3_bucket` is not set). [float] ==== `visibility_timeout` @@ -242,6 +267,24 @@ The maximum duration that an SQS `ReceiveMessage` call should wait for a message to arrive in the queue before returning. The default value is `20s`. The maximum value is `20s`. +[float] +==== `s3_bucket` + +ARN of the AWS S3 bucket that will be polled for list operation. (Required when `queue_url` is not set). + +[float] +==== `bucket_list_interval` + +Time interval for polling listing of the S3 bucket: default to `120s`. + + +[float] +==== `number_of_workers` + +Number of workers that will process the S3 objects listed. (Required when `s3_bucket` is set). + + + [float] ==== `aws credentials` @@ -251,7 +294,8 @@ see <> for more details. [float] === AWS Permissions -Specific AWS permissions are required for IAM user to access SQS and S3: +Specific AWS permissions are required for IAM user to access SQS and S3 +when using the SQS notifications method: ---- s3:GetObject @@ -260,6 +304,14 @@ sqs:ChangeMessageVisibility sqs:DeleteMessage ---- +Reduced specific AWS permissions are required for IAM user to access S3 +when using the polling list of S3 bucket objects: + +---- +s3:GetObject +s3:ListBucket +---- + [float] === S3 and SQS setup @@ -271,7 +323,7 @@ for more details. [float] === Parallel Processing -Multiple Filebeat instances can read from the same SQS queues at the same time. +When using the SQS notifications method, multiple {beatname_uc} instances can read from the same SQS queues at the same time. To horizontally scale processing when there are large amounts of log data flowing into an S3 bucket, you can run multiple {beatname_uc} instances that read from the same SQS queues at the same time. No additional configuration is @@ -282,8 +334,8 @@ when multiple {beatname_uc} instances are running in parallel. To prevent {beatname_uc} from receiving and processing the message more than once, set the visibility timeout. -The visibility timeout begins when SQS returns a message to Filebeat. During -this time, Filebeat processes and deletes the message. However, if Filebeat +The visibility timeout begins when SQS returns a message to {beatname_uc}. During +this time, {beatname_uc} processes and deletes the message. However, if {beatname_uc} fails before deleting the message and your system doesn't call the DeleteMessage action for that message before the visibility timeout expires, the message becomes visible to other {beatname_uc} instances, and the message is received @@ -291,6 +343,16 @@ again. By default, the visibility timeout is set to 5 minutes for aws-s3 input in {beatname_uc}. 5 minutes is sufficient time for {beatname_uc} to read SQS messages and process related s3 log files. +When using the polling list of S3 bucket objects method be aware that if running multiple {beatname_uc} instances, +they can list the same S3 bucket at the same time. Since the state of the ingested S3 objects is persisted +(upon processing every page of the listing operation) in the `path.data` configuration +and multiple {beatname_uc} cannot share the same `path.data` this will produce repeated +ingestion of the S3 object. +Therefore, when using the polling list of S3 bucket objects method, scaling should be +vertical, with a single bigger {beatname_uc} instance and higher `number_of_workers` +config value. + + [float] === Metrics @@ -308,6 +370,9 @@ observe the activity of the input. | `sqs_messages_deleted_total` | Number of SQS messages deleted. | `sqs_message_processing_time` | Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return). | `s3_objects_requested_total` | Number of S3 objects downloaded. +| `s3_objects_listed_total` | Number of S3 objects returned by list operations. +| `s3_objects_processed_total` | Number of S3 objects that matched file_selectors rules. +| `s3_objects_acked_total` | Number of S3 objects processed that were fully ACKed. | `s3_bytes_processed_total` | Number of S3 bytes processed. | `s3_events_created_total` | Number of events created from processing S3 data. | `s3_objects_inflight_gauge` | Number of S3 objects inflight (gauge). diff --git a/x-pack/filebeat/input/awss3/_meta/terraform/README.md b/x-pack/filebeat/input/awss3/_meta/terraform/README.md index cdb209e7099f..7ab27781704a 100644 --- a/x-pack/filebeat/input/awss3/_meta/terraform/README.md +++ b/x-pack/filebeat/input/awss3/_meta/terraform/README.md @@ -16,10 +16,12 @@ before running Terraform or the integration tests. The AWS key must be authorized to create and destroy S3 buckets and SQS queues. 1. Execute terraform in this directory to create the resources. This will also -write the `outputs.yml`. +write the `outputs.yml`. You can use `export TF_VAR_aws_region=NNNNN` in order +to match the AWS region of the profile you are using. `terraform apply` + 2. (Optional) View the output configuration. ```yaml @@ -28,14 +30,14 @@ write the `outputs.yml`. "queue_url": "https://sqs.us-east-1.amazonaws.com/144492464627/filebeat-s3-integtest-8iok1h" ``` -2. Execute the integration test. +4. Execute the integration test. ``` cd x-pack/filebeat/inputs/awss3 - go test -tags aws,integration -run TestInputRun -v . + go test -tags aws,integration -run TestInputRun.+ -v . ``` -3. Cleanup AWS resources. Execute terraform to remove the SQS queue and delete +5. Cleanup AWS resources. Execute terraform to remove the SQS queue and delete the S3 bucket and its contents. `terraform destroy` diff --git a/x-pack/filebeat/input/awss3/acker.go b/x-pack/filebeat/input/awss3/acker.go index c9ab9a755049..ba80601997fe 100644 --- a/x-pack/filebeat/input/awss3/acker.go +++ b/x-pack/filebeat/input/awss3/acker.go @@ -28,8 +28,8 @@ func newEventACKTracker(ctx context.Context) *eventACKTracker { return &eventACKTracker{ctx: ctx, cancel: cancel} } -// Add increments the number of pending ACKs by the specified amount. -func (a *eventACKTracker) Add(messageCount int64) { +// Add increments the number of pending ACKs. +func (a *eventACKTracker) Add() { a.Lock() a.pendingACKs++ a.Unlock() @@ -51,6 +51,10 @@ func (a *eventACKTracker) ACK() { } // Wait waits for the number of pending ACKs to be zero. +// Wait must be called sequentially only after every expected +// Add call are made. Failing to do so could reset the pendingACKs +// property to 0 and would results in Wait returning after additional +// calls to `Add` are made without a corresponding `ACK` call. func (a *eventACKTracker) Wait() { // If there were never any pending ACKs then cancel the context. (This can // happen when a document contains no events or cannot be read due to an error). diff --git a/x-pack/filebeat/input/awss3/acker_test.go b/x-pack/filebeat/input/awss3/acker_test.go index 3a96997f9e9f..a038e8a39e44 100644 --- a/x-pack/filebeat/input/awss3/acker_test.go +++ b/x-pack/filebeat/input/awss3/acker_test.go @@ -18,7 +18,7 @@ func TestEventACKTracker(t *testing.T) { t.Cleanup(cancel) acker := newEventACKTracker(ctx) - acker.Add(1) + acker.Add() acker.ACK() assert.EqualValues(t, 0, acker.pendingACKs) @@ -42,7 +42,7 @@ func TestEventACKHandler(t *testing.T) { // Create acker. Add one pending ACK. acker := newEventACKTracker(ctx) - acker.Add(1) + acker.Add() // Create an ACK handler and simulate one ACKed event. ackHandler := newEventACKHandler() @@ -52,3 +52,18 @@ func TestEventACKHandler(t *testing.T) { assert.EqualValues(t, 0, acker.pendingACKs) assert.ErrorIs(t, acker.ctx.Err(), context.Canceled) } + +func TestEventACKHandlerWait(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + // Create acker. Add one pending ACK. + acker := newEventACKTracker(ctx) + acker.Add() + acker.ACK() + acker.Wait() + acker.Add() + + assert.EqualValues(t, 1, acker.pendingACKs) + assert.ErrorIs(t, acker.ctx.Err(), context.Canceled) +} diff --git a/x-pack/filebeat/input/awss3/config.go b/x-pack/filebeat/input/awss3/config.go index d780f8eec895..9fef4a9fc600 100644 --- a/x-pack/filebeat/input/awss3/config.go +++ b/x-pack/filebeat/input/awss3/config.go @@ -25,7 +25,10 @@ type config struct { SQSMaxReceiveCount int `config:"sqs.max_receive_count"` // The max number of times a message should be received (retried) before deleting it. FIPSEnabled bool `config:"fips_enabled"` MaxNumberOfMessages int `config:"max_number_of_messages"` - QueueURL string `config:"queue_url" validate:"required"` + QueueURL string `config:"queue_url"` + Bucket string `config:"bucket"` + BucketListInterval time.Duration `config:"bucket_list_interval"` + NumberOfWorkers int `config:"number_of_workers"` AWSConfig awscommon.ConfigAWS `config:",inline"` FileSelectors []fileSelectorConfig `config:"file_selectors"` ReaderConfig readerConfig `config:",inline"` // Reader options to apply when no file_selectors are used. @@ -35,6 +38,7 @@ func defaultConfig() config { c := config{ APITimeout: 120 * time.Second, VisibilityTimeout: 300 * time.Second, + BucketListInterval: 120 * time.Second, SQSWaitTime: 20 * time.Second, SQSMaxReceiveCount: 5, FIPSEnabled: false, @@ -45,22 +49,39 @@ func defaultConfig() config { } func (c *config) Validate() error { - if c.VisibilityTimeout <= 0 || c.VisibilityTimeout.Hours() > 12 { + if c.QueueURL == "" && c.Bucket == "" { + return fmt.Errorf("queue_url or bucket must provided") + } + + if c.QueueURL != "" && c.Bucket != "" { + return fmt.Errorf("queue_url <%v> and bucket <%v> "+ + "cannot be set at the same time", c.QueueURL, c.Bucket) + } + + if c.Bucket != "" && c.BucketListInterval <= 0 { + return fmt.Errorf("bucket_list_interval <%v> must be greater than 0", c.BucketListInterval) + } + + if c.Bucket != "" && c.NumberOfWorkers <= 0 { + return fmt.Errorf("number_of_workers <%v> must be greater than 0", c.NumberOfWorkers) + } + + if c.QueueURL != "" && (c.VisibilityTimeout <= 0 || c.VisibilityTimeout.Hours() > 12) { return fmt.Errorf("visibility_timeout <%v> must be greater than 0 and "+ "less than or equal to 12h", c.VisibilityTimeout) } - if c.SQSWaitTime <= 0 || c.SQSWaitTime.Seconds() > 20 { + if c.QueueURL != "" && (c.SQSWaitTime <= 0 || c.SQSWaitTime.Seconds() > 20) { return fmt.Errorf("wait_time <%v> must be greater than 0 and "+ "less than or equal to 20s", c.SQSWaitTime) } - if c.MaxNumberOfMessages <= 0 { + if c.QueueURL != "" && c.MaxNumberOfMessages <= 0 { return fmt.Errorf("max_number_of_messages <%v> must be greater than 0", c.MaxNumberOfMessages) } - if c.APITimeout < c.SQSWaitTime { + if c.QueueURL != "" && c.APITimeout < c.SQSWaitTime { return fmt.Errorf("api_timeout <%v> must be greater than the sqs.wait_time <%v", c.APITimeout, c.SQSWaitTime) } diff --git a/x-pack/filebeat/input/awss3/config_test.go b/x-pack/filebeat/input/awss3/config_test.go index 77e35bcb0f33..c0fd94e2aee6 100644 --- a/x-pack/filebeat/input/awss3/config_test.go +++ b/x-pack/filebeat/input/awss3/config_test.go @@ -8,9 +8,10 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/dustin/go-humanize" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/match" @@ -20,17 +21,20 @@ import ( func TestConfig(t *testing.T) { const queueURL = "https://example.com" - makeConfig := func() config { + const s3Bucket = "arn:aws:s3:::aBucket" + makeConfig := func(quequeURL, s3Bucket string) config { // Have a separate copy of defaults in the test to make it clear when // anyone changes the defaults. parserConf := parser.Config{} require.NoError(t, parserConf.Unpack(common.MustNewConfigFrom(""))) return config{ - QueueURL: queueURL, + QueueURL: quequeURL, + Bucket: s3Bucket, APITimeout: 120 * time.Second, VisibilityTimeout: 300 * time.Second, SQSMaxReceiveCount: 5, SQSWaitTime: 20 * time.Second, + BucketListInterval: 120 * time.Second, FIPSEnabled: false, MaxNumberOfMessages: 5, ReaderConfig: readerConfig{ @@ -44,20 +48,41 @@ func TestConfig(t *testing.T) { testCases := []struct { name string + queueURL string + s3Bucket string config common.MapStr expectedErr string - expectedCfg func() config + expectedCfg func(queueURL, s3Bucket string) config }{ { - "input with defaults", + "input with defaults for queueURL", + queueURL, + "", common.MapStr{ "queue_url": queueURL, }, "", makeConfig, }, + { + "input with defaults for s3Bucket", + "", + s3Bucket, + common.MapStr{ + "bucket": s3Bucket, + "number_of_workers": 5, + }, + "", + func(queueURL, s3Bucket string) config { + c := makeConfig("", s3Bucket) + c.NumberOfWorkers = 5 + return c + }, + }, { "input with file_selectors", + queueURL, + "", common.MapStr{ "queue_url": queueURL, "file_selectors": []common.MapStr{ @@ -67,8 +92,8 @@ func TestConfig(t *testing.T) { }, }, "", - func() config { - c := makeConfig() + func(queueURL, s3Bucketr string) config { + c := makeConfig(queueURL, "") regex := match.MustCompile("/CloudTrail/") c.FileSelectors = []fileSelectorConfig{ { @@ -79,17 +104,43 @@ func TestConfig(t *testing.T) { return c }, }, + { + "error on no queueURL and s3Bucket", + "", + "", + common.MapStr{ + "queue_url": "", + "bucket": "", + }, + "queue_url or bucket must provided", + nil, + }, + { + "error on both queueURL and s3Bucket", + queueURL, + s3Bucket, + common.MapStr{ + "queue_url": queueURL, + "bucket": s3Bucket, + }, + "queue_url and bucket cannot be set at the same time", + nil, + }, { "error on api_timeout == 0", + queueURL, + "", common.MapStr{ "queue_url": queueURL, "api_timeout": "0", }, - "api_timeout <0s> must be greater than the sqs.wait_time", + "api_timeout <0s> must be greater than the sqs.wait_time <20s", nil, }, { "error on visibility_timeout == 0", + queueURL, + "", common.MapStr{ "queue_url": queueURL, "visibility_timeout": "0", @@ -99,6 +150,8 @@ func TestConfig(t *testing.T) { }, { "error on visibility_timeout > 12h", + queueURL, + "", common.MapStr{ "queue_url": queueURL, "visibility_timeout": "12h1ns", @@ -106,8 +159,32 @@ func TestConfig(t *testing.T) { "visibility_timeout <12h0m0.000000001s> must be greater than 0 and less than or equal to 12h", nil, }, + { + "error on bucket_list_interval == 0", + "", + s3Bucket, + common.MapStr{ + "bucket": s3Bucket, + "bucket_list_interval": "0", + }, + "bucket_list_interval <0s> must be greater than 0", + nil, + }, + { + "error on number_of_workers == 0", + "", + s3Bucket, + common.MapStr{ + "bucket": s3Bucket, + "number_of_workers": "0", + }, + "number_of_workers <0> must be greater than 0", + nil, + }, { "error on max_number_of_messages == 0", + queueURL, + "", common.MapStr{ "queue_url": queueURL, "max_number_of_messages": "0", @@ -117,6 +194,8 @@ func TestConfig(t *testing.T) { }, { "error on buffer_size == 0 ", + queueURL, + "", common.MapStr{ "queue_url": queueURL, "buffer_size": "0", @@ -124,8 +203,21 @@ func TestConfig(t *testing.T) { "buffer_size <0> must be greater than 0", nil, }, + { + "error on max_bytes == 0 ", + queueURL, + "", + common.MapStr{ + "queue_url": queueURL, + "max_bytes": "0", + }, + "max_bytes <0> must be greater than 0", + nil, + }, { "error on expand_event_list_from_field and content_type != application/json ", + queueURL, + "", common.MapStr{ "queue_url": queueURL, "expand_event_list_from_field": "Records", @@ -134,6 +226,18 @@ func TestConfig(t *testing.T) { "content_type must be `application/json` when expand_event_list_from_field is used", nil, }, + { + "error on expand_event_list_from_field and content_type != application/json ", + "", + s3Bucket, + common.MapStr{ + "bucket": s3Bucket, + "expand_event_list_from_field": "Records", + "content_type": "text/plain", + }, + "content_type must be `application/json` when expand_event_list_from_field is used", + nil, + }, } for _, tc := range testCases { @@ -152,7 +256,7 @@ func TestConfig(t *testing.T) { if tc.expectedCfg == nil { t.Fatal("missing expected config in test case") } - assert.EqualValues(t, tc.expectedCfg(), c) + assert.EqualValues(t, tc.expectedCfg(tc.queueURL, tc.s3Bucket), c) }) } } diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 82c01778c137..d12b2a24cee5 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -14,56 +14,63 @@ import ( "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/elastic/beats/v7/filebeat/beater" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/libbeat/monitoring" + "github.com/elastic/beats/v7/libbeat/statestore" awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" + "github.com/elastic/go-concert/unison" ) const inputName = "aws-s3" -func Plugin() v2.Plugin { +func Plugin(store beater.StateStore) v2.Plugin { return v2.Plugin{ Name: inputName, Stability: feature.Stable, Deprecated: false, Info: "Collect logs from s3", - Manager: v2.ConfigureWith(configure), + Manager: &s3InputManager{store: store}, } } -func configure(cfg *common.Config) (v2.Input, error) { +type s3InputManager struct { + store beater.StateStore +} + +func (im *s3InputManager) Init(grp unison.Group, mode v2.Mode) error { + return nil +} + +func (im *s3InputManager) Create(cfg *common.Config) (v2.Input, error) { config := defaultConfig() if err := cfg.Unpack(&config); err != nil { return nil, err } - return newInput(config) + return newInput(config, im.store) } // s3Input is a input for reading logs from S3 when triggered by an SQS message. type s3Input struct { config config awsConfig awssdk.Config + store beater.StateStore } -func newInput(config config) (*s3Input, error) { +func newInput(config config, store beater.StateStore) (*s3Input, error) { awsConfig, err := awscommon.InitializeAWSConfig(config.AWSConfig) if err != nil { return nil, fmt.Errorf("failed to initialize AWS credentials: %w", err) } - regionName, err := getRegionFromQueueURL(config.QueueURL, config.AWSConfig.Endpoint) - if err != nil { - return nil, fmt.Errorf("failed to get AWS region from queue_url: %w", err) - } - awsConfig.Region = regionName - return &s3Input{ config: config, awsConfig: awsConfig, + store: store, }, nil } @@ -74,6 +81,21 @@ func (in *s3Input) Test(ctx v2.TestContext) error { } func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { + var err error + + persistentStore, err := in.store.Access() + if err != nil { + return fmt.Errorf("can not access persistent store: %w", err) + } + + defer persistentStore.Close() + + states := newStates(inputContext) + err = states.readStatesFrom(persistentStore) + if err != nil { + return fmt.Errorf("can not start persistent store: %w", err) + } + // Wrap input Context's cancellation Done channel a context.Context. This // goroutine stops with the parent closes the Done channel. ctx, cancelInputCtx := context.WithCancel(context.Background()) @@ -96,15 +118,37 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { } defer client.Close() - // Create SQS receiver and S3 notification processor. - receiver, err := in.createSQSReceiver(inputContext, client) - if err != nil { - return fmt.Errorf("failed to initialize sqs receiver: %w", err) + if in.config.QueueURL != "" { + regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint) + if err != nil { + return fmt.Errorf("failed to get AWS region from queue_url: %w", err) + } + + in.awsConfig.Region = regionName + + // Create SQS receiver and S3 notification processor. + receiver, err := in.createSQSReceiver(inputContext, client) + if err != nil { + return fmt.Errorf("failed to initialize sqs receiver: %w", err) + } + defer receiver.metrics.Close() + + if err := receiver.Receive(ctx); err != nil { + return err + } } - defer receiver.metrics.Close() - if err := receiver.Receive(ctx); err != nil { - return err + if in.config.Bucket != "" { + // Create S3 receiver and S3 notification processor. + poller, err := in.createS3Lister(inputContext, client, persistentStore, states) + if err != nil { + return fmt.Errorf("failed to initialize sqs receiver: %w", err) + } + defer poller.metrics.Close() + + if err := poller.Poll(ctx); err != nil { + return err + } } return nil @@ -149,6 +193,43 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, client beat.Client) (*sqsRe return sqsReader, nil } +func (in *s3Input) createS3Lister(ctx v2.Context, client beat.Client, persistentStore *statestore.Store, states *states) (*s3Poller, error) { + s3ServiceName := "s3" + if in.config.FIPSEnabled { + s3ServiceName = "s3-fips" + } + + s3API := &awsS3API{ + client: s3.New(awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, s3ServiceName, in.awsConfig.Region, in.awsConfig)), + } + + log := ctx.Logger.With("s3_bucket", in.config.Bucket) + log.Infof("number_of_workers is set to %v.", in.config.NumberOfWorkers) + log.Infof("bucket_list_interval is set to %v.", in.config.BucketListInterval) + log.Infof("AWS region is set to %v.", in.awsConfig.Region) + log.Debugf("AWS S3 service name is %v.", s3ServiceName) + + metricRegistry := monitoring.GetNamespace("dataset").GetRegistry() + metrics := newInputMetrics(metricRegistry, ctx.ID) + + fileSelectors := in.config.FileSelectors + if len(in.config.FileSelectors) == 0 { + fileSelectors = []fileSelectorConfig{{ReaderConfig: in.config.ReaderConfig}} + } + s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, fileSelectors) + s3Poller := newS3Poller(log.Named("s3_poller"), + metrics, + s3API, + s3EventHandlerFactory, + states, + persistentStore, + in.config.Bucket, + in.config.NumberOfWorkers, + in.config.BucketListInterval) + + return s3Poller, nil +} + func getRegionFromQueueURL(queueURL string, endpoint string) (string, error) { // get region from queueURL // Example: https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs diff --git a/x-pack/filebeat/input/awss3/input_benchmark_test.go b/x-pack/filebeat/input/awss3/input_benchmark_test.go index 50223b8fc75d..0c7df7e012b0 100644 --- a/x-pack/filebeat/input/awss3/input_benchmark_test.go +++ b/x-pack/filebeat/input/awss3/input_benchmark_test.go @@ -14,19 +14,25 @@ import ( "testing" "time" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/dustin/go-humanize" "github.com/olekukonko/tablewriter" "github.com/pkg/errors" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" + "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/beats/v7/libbeat/statestore/storetest" ) const cloudtrailTestFile = "testdata/aws-cloudtrail.json.gz" +const totalListingObjects = 10000 type constantSQS struct { msgs []sqs.Message @@ -54,13 +60,62 @@ func (_ *constantSQS) ChangeMessageVisibility(ctx context.Context, msg *sqs.Mess return nil } +type s3PagerConstant struct { + objects []s3.Object + currentIndex int +} + +var _ s3Pager = (*s3PagerConstant)(nil) + +func (c *s3PagerConstant) Next(ctx context.Context) bool { + return c.currentIndex < len(c.objects) +} + +func (c *s3PagerConstant) CurrentPage() *s3.ListObjectsOutput { + ret := &s3.ListObjectsOutput{} + pageSize := 1000 + if len(c.objects) < c.currentIndex+pageSize { + pageSize = len(c.objects) - c.currentIndex + } + + ret.Contents = c.objects[c.currentIndex : c.currentIndex+pageSize] + c.currentIndex = c.currentIndex + pageSize + + return ret +} + +func (c *s3PagerConstant) Err() error { + if c.currentIndex >= len(c.objects) { + c.currentIndex = 0 + } + return nil +} + +func newS3PagerConstant() *s3PagerConstant { + lastModified := time.Now() + ret := &s3PagerConstant{ + currentIndex: 0, + } + + for i := 0; i < totalListingObjects; i++ { + ret.objects = append(ret.objects, s3.Object{ + Key: aws.String(fmt.Sprintf("key-%d.json.gz", i)), + ETag: aws.String(fmt.Sprintf("etag-%d", i)), + LastModified: aws.Time(lastModified), + }) + } + + return ret +} + type constantS3 struct { - filename string - data []byte - contentType string + filename string + data []byte + contentType string + pagerConstant s3Pager } -var _ s3Getter = (*constantS3)(nil) +var _ s3API = (*constantS3)(nil) func newConstantS3(t testing.TB) *constantS3 { data, err := ioutil.ReadFile(cloudtrailTestFile) @@ -79,6 +134,10 @@ func (c constantS3) GetObject(ctx context.Context, bucket, key string) (*s3.GetO return newS3GetObjectResponse(c.filename, c.data, c.contentType), nil } +func (c constantS3) ListObjectsPaginator(bucket string) s3Pager { + return c.pagerConstant +} + func makeBenchmarkConfig(t testing.TB) config { cfg := common.MustNewConfigFrom(`--- queue_url: foo @@ -95,7 +154,7 @@ file_selectors: return inputConfig } -func benchmarkInput(t *testing.T, maxMessagesInflight int) testing.BenchmarkResult { +func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkResult { return testing.Benchmark(func(b *testing.B) { log := logp.NewLogger(inputName) metricRegistry := monitoring.NewRegistry() @@ -151,21 +210,21 @@ func benchmarkInput(t *testing.T, maxMessagesInflight int) testing.BenchmarkResu }) } -func TestBenchmarkInput(t *testing.T) { +func TestBenchmarkInputSQS(t *testing.T) { logp.TestingSetup(logp.WithLevel(logp.InfoLevel)) results := []testing.BenchmarkResult{ - benchmarkInput(t, 1), - benchmarkInput(t, 2), - benchmarkInput(t, 4), - benchmarkInput(t, 8), - benchmarkInput(t, 16), - benchmarkInput(t, 32), - benchmarkInput(t, 64), - benchmarkInput(t, 128), - benchmarkInput(t, 256), - benchmarkInput(t, 512), - benchmarkInput(t, 1024), + benchmarkInputSQS(t, 1), + benchmarkInputSQS(t, 2), + benchmarkInputSQS(t, 4), + benchmarkInputSQS(t, 8), + benchmarkInputSQS(t, 16), + benchmarkInputSQS(t, 32), + benchmarkInputSQS(t, 64), + benchmarkInputSQS(t, 128), + benchmarkInputSQS(t, 256), + benchmarkInputSQS(t, 512), + benchmarkInputSQS(t, 1024), } headers := []string{ @@ -191,3 +250,119 @@ func TestBenchmarkInput(t *testing.T) { table.AppendBulk(data) table.Render() } + +func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult { + return testing.Benchmark(func(b *testing.B) { + log := logp.NewLogger(inputName) + metricRegistry := monitoring.NewRegistry() + metrics := newInputMetrics(metricRegistry, "test_id") + s3API := newConstantS3(t) + s3API.pagerConstant = newS3PagerConstant() + client := pubtest.NewChanClientWithCallback(100, func(event beat.Event) { + event.Private.(*eventACKTracker).ACK() + }) + + defer close(client.Channel) + conf := makeBenchmarkConfig(t) + + storeReg := statestore.NewRegistry(storetest.NewMemoryStoreBackend()) + store, err := storeReg.Get("test") + if err != nil { + t.Fatalf("Failed to access store: %v", err) + } + + err = store.Set(awsS3WriteCommitPrefix+"bucket", &commitWriteState{time.Time{}}) + if err != nil { + t.Fatalf("Failed to reset store: %v", err) + } + + s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, conf.FileSelectors) + s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", numberOfWorkers, time.Second) + + ctx, cancel := context.WithCancel(context.Background()) + b.Cleanup(cancel) + + go func() { + for metrics.s3ObjectsAckedTotal.Get() < totalListingObjects { + time.Sleep(5 * time.Millisecond) + } + cancel() + }() + + b.ResetTimer() + start := time.Now() + if err := s3Poller.Poll(ctx); err != nil { + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatal(err) + } + } + b.StopTimer() + elapsed := time.Since(start) + + b.ReportMetric(float64(numberOfWorkers), "number_of_workers") + b.ReportMetric(elapsed.Seconds(), "sec") + + b.ReportMetric(float64(metrics.s3EventsCreatedTotal.Get()), "events") + b.ReportMetric(float64(metrics.s3EventsCreatedTotal.Get())/elapsed.Seconds(), "events_per_sec") + + b.ReportMetric(float64(metrics.s3BytesProcessedTotal.Get()), "s3_bytes") + b.ReportMetric(float64(metrics.s3BytesProcessedTotal.Get())/elapsed.Seconds(), "s3_bytes_per_sec") + + b.ReportMetric(float64(metrics.s3ObjectsListedTotal.Get()), "objects_listed") + b.ReportMetric(float64(metrics.s3ObjectsListedTotal.Get())/elapsed.Seconds(), "objects_listed_per_sec") + + b.ReportMetric(float64(metrics.s3ObjectsProcessedTotal.Get()), "objects_processed") + b.ReportMetric(float64(metrics.s3ObjectsProcessedTotal.Get())/elapsed.Seconds(), "objects_processed_per_sec") + + b.ReportMetric(float64(metrics.s3ObjectsAckedTotal.Get()), "objects_acked") + b.ReportMetric(float64(metrics.s3ObjectsAckedTotal.Get())/elapsed.Seconds(), "objects_acked_per_sec") + + }) +} + +func TestBenchmarkInputS3(t *testing.T) { + logp.TestingSetup(logp.WithLevel(logp.InfoLevel)) + + results := []testing.BenchmarkResult{ + benchmarkInputS3(t, 1), + benchmarkInputS3(t, 2), + benchmarkInputS3(t, 4), + benchmarkInputS3(t, 8), + benchmarkInputS3(t, 16), + benchmarkInputS3(t, 32), + benchmarkInputS3(t, 64), + benchmarkInputS3(t, 128), + benchmarkInputS3(t, 256), + benchmarkInputS3(t, 512), + benchmarkInputS3(t, 1024), + } + + headers := []string{ + "Number of workers", + "Objects listed per sec", + "Objects processed per sec", + "Objects acked per sec", + "Events per sec", + "S3 Bytes per sec", + "Time (sec)", + "CPUs", + } + var data [][]string + for _, r := range results { + data = append(data, []string{ + fmt.Sprintf("%v", r.Extra["number_of_workers"]), + fmt.Sprintf("%v", r.Extra["objects_listed_per_sec"]), + fmt.Sprintf("%v", r.Extra["objects_processed_per_sec"]), + fmt.Sprintf("%v", r.Extra["objects_acked_per_sec"]), + fmt.Sprintf("%v", r.Extra["events_per_sec"]), + fmt.Sprintf("%v", humanize.Bytes(uint64(r.Extra["s3_bytes_per_sec"]))), + fmt.Sprintf("%v", r.Extra["sec"]), + fmt.Sprintf("%v", runtime.GOMAXPROCS(0)), + }) + } + + table := tablewriter.NewWriter(os.Stdout) + table.SetHeader(headers) + table.AppendBulk(data) + table.Render() +} diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index 0580b6f067b2..1ce157c553f8 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -19,6 +19,8 @@ import ( "testing" "time" + "github.com/elastic/beats/v7/filebeat/beater" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws/external" "github.com/aws/aws-sdk-go-v2/service/s3/s3manager" @@ -32,6 +34,8 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" + "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/beats/v7/libbeat/statestore/storetest" ) const ( @@ -69,7 +73,35 @@ func getTerraformOutputs(t *testing.T) terraformOutputData { return rtn } -func makeTestConfig(queueURL string) *common.Config { +func makeTestConfigS3(s3bucket string) *common.Config { + return common.MustNewConfigFrom(fmt.Sprintf(`--- +bucket: aws:s3:::%s +number_of_workers: 1 +file_selectors: +- + regex: 'events-array.json$' + expand_event_list_from_field: Events + content_type: application/json + include_s3_metadata: + - last-modified + - x-amz-version-id + - x-amz-storage-class + - Content-Length + - Content-Type +- + regex: '\.(?:nd)?json(\.gz)?$' + content_type: application/json +- + regex: 'multiline.txt$' + parsers: + - multiline: + pattern: "^ is not in event", key) - } + event := s3EventV2{} + event.S3.Bucket.Name = bucketName + event.S3.Object.Key = filename - dec := json.NewDecoder(bytes.NewReader(raw)) - dec.UseNumber() + acker := newEventACKTracker(ctx) - tok, err := dec.Token() - if err != nil { - return err - } - delim, ok := tok.(json.Delim) - if !ok || delim != '[' { - return fmt.Errorf("expand_event_list_from_field <%v> is not an array", key) - } + s3Processor := p.s3ObjectHandler.Create(ctx, p.log, acker, event) + if s3Processor == nil { + continue + } - for dec.More() { - arrayOffset := dec.InputOffset() + totProcessableObjects++ - var item json.RawMessage - if err := dec.Decode(&item); err != nil { - return fmt.Errorf("failed to decode array item at offset %d: %w", offset+arrayOffset, err) + s3ObjectPayloadChanByPage <- &s3ObjectPayload{ + s3ObjectHandler: s3Processor, + s3ObjectInfo: s3ObjectInfo{ + name: bucketName, + key: filename, + etag: *object.ETag, + lastModified: *object.LastModified, + listingID: listingID.String(), + }, + s3ObjectEvent: event, + } } - data, _ := item.MarshalJSON() - evt := createEvent(string(data), offset+arrayOffset, p.s3Obj, objHash, p.s3Metadata) - p.publish(p.acker, &evt) - } - - return nil -} + if totProcessableObjects == 0 { + // nothing to be ACKed, unlock here + p.states.DeleteListing(listingID.String()) + lock.Unlock() + } else { + listingInfo := &listingInfo{totObjects: totProcessableObjects} + p.states.AddListing(listingID.String(), listingInfo) -func (p *s3ObjectProcessor) readFile(r io.Reader) error { - encodingFactory, ok := encoding.FindEncoding(p.readerConfig.Encoding) - if !ok || encodingFactory == nil { - return fmt.Errorf("failed to find '%v' encoding", p.readerConfig.Encoding) - } + // Metrics + p.metrics.s3ObjectsProcessedTotal.Add(uint64(totProcessableObjects)) + } - enc, err := encodingFactory(r) - if err != nil { - return fmt.Errorf("failed to initialize encoding: %w", err) + close(s3ObjectPayloadChanByPage) + for s3ObjectPayload := range s3ObjectPayloadChanByPage { + s3ObjectPayloadChan <- s3ObjectPayload + } } - var reader reader.Reader - reader, err = readfile.NewEncodeReader(ioutil.NopCloser(r), readfile.Config{ - Codec: enc, - BufferSize: int(p.readerConfig.BufferSize), - Terminator: p.readerConfig.LineTerminator, - MaxBytes: int(p.readerConfig.MaxBytes) * 4, - }) - if err != nil { - return fmt.Errorf("failed to create encode reader: %w", err) + if err := paginator.Err(); err != nil { + p.log.Warnw("Error when paginating listing.", "error", err) } - reader = readfile.NewStripNewline(reader, p.readerConfig.LineTerminator) - reader = p.readerConfig.Parsers.Create(reader) - reader = readfile.NewLimitReader(reader, int(p.readerConfig.MaxBytes)) + return +} - var offset int64 - for { - message, err := reader.Next() - if err == io.EOF { - // No more lines - break - } - if err != nil { - return fmt.Errorf("error reading message: %w", err) +func (p *s3Poller) Purge() { + listingIDs := p.states.GetListingIDs() + for _, listingID := range listingIDs { + // we lock here in order to process the purge only after + // full listing page is ACKed by all the workers + lock, loaded := p.workersListingMap.Load(listingID) + if !loaded { + // purge calls can overlap, GetListingIDs can return + // an outdated snapshot with listing already purged + p.states.DeleteListing(listingID) + continue } - event := createEvent(string(message.Content), offset, p.s3Obj, p.s3ObjHash, p.s3Metadata) - event.Fields.DeepUpdate(message.Fields) - offset += int64(message.Bytes) - p.publish(p.acker, &event) - } + lock.(*sync.Mutex).Lock() - return nil -} + keys := map[string]struct{}{} + latestStoredTimeByBucket := make(map[string]time.Time, 0) -func (p *s3ObjectProcessor) publish(ack *eventACKTracker, event *beat.Event) { - ack.Add(1) - event.Private = ack - p.metrics.s3EventsCreatedTotal.Inc() - p.publisher.Publish(*event) -} + for _, state := range p.states.GetStatesByListingID(listingID) { + // it is not stored, keep + if !state.Stored { + continue + } -func createEvent(message string, offset int64, obj s3EventV2, objectHash string, meta map[string]interface{}) beat.Event { - event := beat.Event{ - Timestamp: time.Now().UTC(), - Fields: common.MapStr{ - "message": message, - "log": common.MapStr{ - "offset": offset, - "file": common.MapStr{ - "path": constructObjectURL(obj), - }, - }, - "aws": common.MapStr{ - "s3": common.MapStr{ - "bucket": common.MapStr{ - "name": obj.S3.Bucket.Name, - "arn": obj.S3.Bucket.ARN}, - "object": common.MapStr{ - "key": obj.S3.Object.Key, - }, - }, - }, - "cloud": common.MapStr{ - "provider": "aws", - "region": obj.AWSRegion, - }, - }, - } - event.SetID(objectID(objectHash, offset)) + var latestStoredTime time.Time + keys[state.ID] = struct{}{} + latestStoredTime, ok := latestStoredTimeByBucket[state.Bucket] + if !ok { + var commitWriteState commitWriteState + err := p.store.Get(awsS3WriteCommitPrefix+state.Bucket, &commitWriteState) + if err == nil { + // we have no entry in the map and we have no entry in the store + // set zero time + latestStoredTime = time.Time{} + } else { + latestStoredTime = commitWriteState.Time + } + } - if len(meta) > 0 { - event.Fields.Put("aws.s3.metadata", meta) - } + if state.LastModified.After(latestStoredTime) { + latestStoredTimeByBucket[state.Bucket] = state.LastModified + } - return event -} + } -func objectID(objectHash string, offset int64) string { - return fmt.Sprintf("%s-%012d", objectHash, offset) -} + for key := range keys { + p.states.Delete(key) + } -func constructObjectURL(obj s3EventV2) string { - return "https://" + obj.S3.Bucket.Name + ".s3." + obj.AWSRegion + ".amazonaws.com/" + obj.S3.Object.Key -} + if err := p.states.writeStates(p.store); err != nil { + p.log.Errorw("Failed to write states to the registry", "error", err) + } -// s3ObjectHash returns a short sha256 hash of the bucket arn + object key name. -func s3ObjectHash(obj s3EventV2) string { - h := sha256.New() - h.Write([]byte(obj.S3.Bucket.ARN)) - h.Write([]byte(obj.S3.Object.Key)) - prefix := hex.EncodeToString(h.Sum(nil)) - return prefix[:10] -} + for bucket, latestStoredTime := range latestStoredTimeByBucket { + if err := p.store.Set(awsS3WriteCommitPrefix+bucket, commitWriteState{latestStoredTime}); err != nil { + p.log.Errorw("Failed to write commit time to the registry", "error", err) + } + } -// isStreamGzipped determines whether the given stream of bytes (encapsulated in a buffered reader) -// represents gzipped content or not. A buffered reader is used so the function can peek into the byte -// stream without consuming it. This makes it convenient for code executed after this function call -// to consume the stream if it wants. -func isStreamGzipped(r *bufio.Reader) (bool, error) { - // Why 512? See https://godoc.org/net/http#DetectContentType - buf, err := r.Peek(512) - if err != nil && err != io.EOF { - return false, err + // purge is done, we can unlock and clean + lock.(*sync.Mutex).Unlock() + p.workersListingMap.Delete(listingID) + p.states.DeleteListing(listingID) } - switch http.DetectContentType(buf) { - case "application/x-gzip", "application/zip": - return true, nil - default: - return false, nil - } + return } -// s3Metadata returns a map containing the selected S3 object metadata keys. -func s3Metadata(resp *s3.GetObjectResponse, keys ...string) common.MapStr { - if len(keys) == 0 { - return nil - } - - // When you upload objects using the REST API, the optional user-defined - // metadata names must begin with "x-amz-meta-" to distinguish them from - // other HTTP headers. - const userMetaPrefix = "x-amz-meta-" - - allMeta := map[string]interface{}{} - - // Get headers using AWS SDK struct tags. - fields := reflect.TypeOf(resp.GetObjectOutput).Elem() - values := reflect.ValueOf(resp.GetObjectOutput).Elem() - for i := 0; i < fields.NumField(); i++ { - f := fields.Field(i) - - if loc, _ := f.Tag.Lookup("location"); loc != "header" { - continue +func (p *s3Poller) Poll(ctx context.Context) error { + // This loop tries to keep the workers busy as much as possible while + // honoring the number in config opposed to a simpler loop that does one + // listing, sequentially processes every object and then does another listing + workerWg := new(sync.WaitGroup) + for ctx.Err() == nil { + // Determine how many S3 workers are available. + workers, err := p.workerSem.AcquireContext(p.numberOfWorkers, ctx) + if err != nil { + break } - name, found := f.Tag.Lookup("locationName") - if !found { + if workers == 0 { continue } - name = strings.ToLower(name) - if name == userMetaPrefix { - continue - } + s3ObjectPayloadChan := make(chan *s3ObjectPayload) - v := values.Field(i) - switch v.Kind() { - case reflect.Ptr: - if v.IsNil() { - continue - } - v = v.Elem() - default: - if v.IsZero() { - continue - } + workerWg.Add(1) + go func() { + defer func() { + workerWg.Done() + }() + + p.GetS3Objects(ctx, s3ObjectPayloadChan) + p.Purge() + }() + + workerWg.Add(workers) + for i := 0; i < workers; i++ { + go func() { + defer func() { + workerWg.Done() + p.workerSem.Release(1) + }() + if err := p.ProcessObject(s3ObjectPayloadChan); err != nil { + p.log.Warnw("Failed processing S3 listing.", "error", err) + } + }() } - allMeta[name] = v.Interface() - } + timed.Wait(ctx, p.bucketPollInterval) - // Add in the user defined headers. - for k, v := range resp.Metadata { - k = strings.ToLower(k) - allMeta[userMetaPrefix+k] = v } - // Select the matching headers from the config. - metadata := common.MapStr{} - for _, key := range keys { - key = strings.ToLower(key) - - v, found := allMeta[key] - if !found { - continue - } + // Wait for all workers to finish. + workerWg.Wait() - metadata[key] = v + if errors.Is(ctx.Err(), context.Canceled) { + // A canceled context is a normal shutdown. + return nil } - - return metadata + return ctx.Err() } diff --git a/x-pack/filebeat/input/awss3/s3_objects.go b/x-pack/filebeat/input/awss3/s3_objects.go new file mode 100644 index 000000000000..f486fbac0321 --- /dev/null +++ b/x-pack/filebeat/input/awss3/s3_objects.go @@ -0,0 +1,453 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awss3 + +import ( + "bufio" + "bytes" + "compress/gzip" + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "reflect" + "strings" + "time" + + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/monitoring" + "github.com/elastic/beats/v7/libbeat/reader" + "github.com/elastic/beats/v7/libbeat/reader/readfile" + "github.com/elastic/beats/v7/libbeat/reader/readfile/encoding" +) + +const ( + contentTypeJSON = "application/json" + contentTypeNDJSON = "application/x-ndjson" +) + +type s3ObjectProcessorFactory struct { + log *logp.Logger + metrics *inputMetrics + s3 s3Getter + publisher beat.Client + fileSelectors []fileSelectorConfig +} + +func newS3ObjectProcessorFactory(log *logp.Logger, metrics *inputMetrics, s3 s3Getter, publisher beat.Client, sel []fileSelectorConfig) *s3ObjectProcessorFactory { + if metrics == nil { + metrics = newInputMetrics(monitoring.NewRegistry(), "") + } + if len(sel) == 0 { + sel = []fileSelectorConfig{ + {ReaderConfig: defaultConfig().ReaderConfig}, + } + } + return &s3ObjectProcessorFactory{ + log: log, + metrics: metrics, + s3: s3, + publisher: publisher, + fileSelectors: sel, + } +} + +func (f *s3ObjectProcessorFactory) findReaderConfig(key string) *readerConfig { + for _, sel := range f.fileSelectors { + if sel.Regex == nil || sel.Regex.MatchString(key) { + return &sel.ReaderConfig + } + } + return nil +} + +// Create returns a new s3ObjectProcessor. It returns nil when no file selectors +// match the S3 object key. +func (f *s3ObjectProcessorFactory) Create(ctx context.Context, log *logp.Logger, ack *eventACKTracker, obj s3EventV2) s3ObjectHandler { + log = log.With( + "s3_bucket", obj.S3.Bucket.Name, + "s3_object", obj.S3.Object.Key) + + readerConfig := f.findReaderConfig(obj.S3.Object.Key) + if readerConfig == nil { + log.Debug("Skipping S3 object processing. No file_selectors are a match.") + return nil + } + + return &s3ObjectProcessor{ + s3ObjectProcessorFactory: f, + log: log, + ctx: ctx, + acker: ack, + readerConfig: readerConfig, + s3Obj: obj, + s3ObjHash: s3ObjectHash(obj), + } +} + +type s3ObjectProcessor struct { + *s3ObjectProcessorFactory + + log *logp.Logger + ctx context.Context + acker *eventACKTracker // ACKer tied to the SQS message (multiple S3 readers share an ACKer when the S3 notification event contains more than one S3 object). + readerConfig *readerConfig // Config about how to process the object. + s3Obj s3EventV2 // S3 object information. + s3ObjHash string + + s3Metadata map[string]interface{} // S3 object metadata. +} + +func (p *s3ObjectProcessor) Wait() { + p.acker.Wait() +} + +func (p *s3ObjectProcessor) ProcessS3Object() error { + if p == nil { + return nil + } + + // Metrics and Logging + { + p.log.Debug("Begin S3 object processing.") + p.metrics.s3ObjectsRequestedTotal.Inc() + p.metrics.s3ObjectsInflight.Inc() + start := time.Now() + defer func() { + elapsed := time.Since(start) + p.metrics.s3ObjectsInflight.Dec() + p.metrics.s3ObjectProcessingTime.Update(elapsed.Nanoseconds()) + p.log.Debugw("End S3 object processing.", "elapsed_time_ns", elapsed) + }() + } + + // Request object (download). + contentType, meta, body, err := p.download() + if err != nil { + return errors.Wrap(err, "failed to get s3 object") + } + defer body.Close() + p.s3Metadata = meta + + reader, err := p.addGzipDecoderIfNeeded(newMonitoredReader(body, p.metrics.s3BytesProcessedTotal)) + if err != nil { + return errors.Wrap(err, "failed checking for gzip content") + } + + // Overwrite with user configured Content-Type. + if p.readerConfig.ContentType != "" { + contentType = p.readerConfig.ContentType + } + + // Process object content stream. + switch { + case contentType == contentTypeJSON || contentType == contentTypeNDJSON: + err = p.readJSON(reader) + default: + err = p.readFile(reader) + } + if err != nil { + return err + } + + return nil +} + +// download requests the S3 object from AWS and returns the object's +// Content-Type and reader to get the object's contents. The caller must +// close the returned reader. +func (p *s3ObjectProcessor) download() (contentType string, metadata map[string]interface{}, body io.ReadCloser, err error) { + resp, err := p.s3.GetObject(p.ctx, p.s3Obj.S3.Bucket.Name, p.s3Obj.S3.Object.Key) + if err != nil { + return "", nil, nil, err + } + meta := s3Metadata(resp, p.readerConfig.IncludeS3Metadata...) + return *resp.ContentType, meta, resp.Body, nil +} + +func (p *s3ObjectProcessor) addGzipDecoderIfNeeded(body io.Reader) (io.Reader, error) { + bufReader := bufio.NewReader(body) + + gzipped, err := isStreamGzipped(bufReader) + if err != nil { + return nil, err + } + if !gzipped { + return bufReader, nil + } + + return gzip.NewReader(bufReader) +} + +func (p *s3ObjectProcessor) readJSON(r io.Reader) error { + dec := json.NewDecoder(r) + dec.UseNumber() + + for dec.More() && p.ctx.Err() == nil { + offset := dec.InputOffset() + + var item json.RawMessage + if err := dec.Decode(&item); err != nil { + return fmt.Errorf("failed to decode json: %w", err) + } + + if p.readerConfig.ExpandEventListFromField != "" { + if err := p.splitEventList(p.readerConfig.ExpandEventListFromField, item, offset, p.s3ObjHash); err != nil { + return err + } + continue + } + + data, _ := item.MarshalJSON() + evt := createEvent(string(data), offset, p.s3Obj, p.s3ObjHash, p.s3Metadata) + p.publish(p.acker, &evt) + } + + return nil +} + +func (p *s3ObjectProcessor) splitEventList(key string, raw json.RawMessage, offset int64, objHash string) error { + var jsonObject map[string]json.RawMessage + if err := json.Unmarshal(raw, &jsonObject); err != nil { + return err + } + + raw, found := jsonObject[key] + if !found { + return fmt.Errorf("expand_event_list_from_field key <%v> is not in event", key) + } + + dec := json.NewDecoder(bytes.NewReader(raw)) + dec.UseNumber() + + tok, err := dec.Token() + if err != nil { + return err + } + delim, ok := tok.(json.Delim) + if !ok || delim != '[' { + return fmt.Errorf("expand_event_list_from_field <%v> is not an array", key) + } + + for dec.More() { + arrayOffset := dec.InputOffset() + + var item json.RawMessage + if err := dec.Decode(&item); err != nil { + return fmt.Errorf("failed to decode array item at offset %d: %w", offset+arrayOffset, err) + } + + data, _ := item.MarshalJSON() + evt := createEvent(string(data), offset+arrayOffset, p.s3Obj, objHash, p.s3Metadata) + p.publish(p.acker, &evt) + } + + return nil +} + +func (p *s3ObjectProcessor) readFile(r io.Reader) error { + encodingFactory, ok := encoding.FindEncoding(p.readerConfig.Encoding) + if !ok || encodingFactory == nil { + return fmt.Errorf("failed to find '%v' encoding", p.readerConfig.Encoding) + } + + enc, err := encodingFactory(r) + if err != nil { + return fmt.Errorf("failed to initialize encoding: %w", err) + } + + var reader reader.Reader + reader, err = readfile.NewEncodeReader(ioutil.NopCloser(r), readfile.Config{ + Codec: enc, + BufferSize: int(p.readerConfig.BufferSize), + Terminator: p.readerConfig.LineTerminator, + MaxBytes: int(p.readerConfig.MaxBytes) * 4, + }) + if err != nil { + return fmt.Errorf("failed to create encode reader: %w", err) + } + + reader = readfile.NewStripNewline(reader, p.readerConfig.LineTerminator) + reader = p.readerConfig.Parsers.Create(reader) + reader = readfile.NewLimitReader(reader, int(p.readerConfig.MaxBytes)) + + var offset int64 + for { + message, err := reader.Next() + if err == io.EOF { + // No more lines + break + } + if err != nil { + return fmt.Errorf("error reading message: %w", err) + } + + event := createEvent(string(message.Content), offset, p.s3Obj, p.s3ObjHash, p.s3Metadata) + event.Fields.DeepUpdate(message.Fields) + offset += int64(message.Bytes) + p.publish(p.acker, &event) + } + + return nil +} + +func (p *s3ObjectProcessor) publish(ack *eventACKTracker, event *beat.Event) { + ack.Add() + event.Private = ack + p.metrics.s3EventsCreatedTotal.Inc() + p.publisher.Publish(*event) +} + +func createEvent(message string, offset int64, obj s3EventV2, objectHash string, meta map[string]interface{}) beat.Event { + event := beat.Event{ + Timestamp: time.Now().UTC(), + Fields: common.MapStr{ + "message": message, + "log": common.MapStr{ + "offset": offset, + "file": common.MapStr{ + "path": constructObjectURL(obj), + }, + }, + "aws": common.MapStr{ + "s3": common.MapStr{ + "bucket": common.MapStr{ + "name": obj.S3.Bucket.Name, + "arn": obj.S3.Bucket.ARN}, + "object": common.MapStr{ + "key": obj.S3.Object.Key, + }, + }, + }, + "cloud": common.MapStr{ + "provider": "aws", + "region": obj.AWSRegion, + }, + }, + } + event.SetID(objectID(objectHash, offset)) + + if len(meta) > 0 { + event.Fields.Put("aws.s3.metadata", meta) + } + + return event +} + +func objectID(objectHash string, offset int64) string { + return fmt.Sprintf("%s-%012d", objectHash, offset) +} + +func constructObjectURL(obj s3EventV2) string { + return "https://" + obj.S3.Bucket.Name + ".s3." + obj.AWSRegion + ".amazonaws.com/" + obj.S3.Object.Key +} + +// s3ObjectHash returns a short sha256 hash of the bucket arn + object key name. +func s3ObjectHash(obj s3EventV2) string { + h := sha256.New() + h.Write([]byte(obj.S3.Bucket.ARN)) + h.Write([]byte(obj.S3.Object.Key)) + prefix := hex.EncodeToString(h.Sum(nil)) + return prefix[:10] +} + +// isStreamGzipped determines whether the given stream of bytes (encapsulated in a buffered reader) +// represents gzipped content or not. A buffered reader is used so the function can peek into the byte +// stream without consuming it. This makes it convenient for code executed after this function call +// to consume the stream if it wants. +func isStreamGzipped(r *bufio.Reader) (bool, error) { + // Why 512? See https://godoc.org/net/http#DetectContentType + buf, err := r.Peek(512) + if err != nil && err != io.EOF { + return false, err + } + + switch http.DetectContentType(buf) { + case "application/x-gzip", "application/zip": + return true, nil + default: + return false, nil + } +} + +// s3Metadata returns a map containing the selected S3 object metadata keys. +func s3Metadata(resp *s3.GetObjectResponse, keys ...string) common.MapStr { + if len(keys) == 0 { + return nil + } + + // When you upload objects using the REST API, the optional user-defined + // metadata names must begin with "x-amz-meta-" to distinguish them from + // other HTTP headers. + const userMetaPrefix = "x-amz-meta-" + + allMeta := map[string]interface{}{} + + // Get headers using AWS SDK struct tags. + fields := reflect.TypeOf(resp.GetObjectOutput).Elem() + values := reflect.ValueOf(resp.GetObjectOutput).Elem() + for i := 0; i < fields.NumField(); i++ { + f := fields.Field(i) + + if loc, _ := f.Tag.Lookup("location"); loc != "header" { + continue + } + + name, found := f.Tag.Lookup("locationName") + if !found { + continue + } + name = strings.ToLower(name) + + if name == userMetaPrefix { + continue + } + + v := values.Field(i) + switch v.Kind() { + case reflect.Ptr: + if v.IsNil() { + continue + } + v = v.Elem() + default: + if v.IsZero() { + continue + } + } + + allMeta[name] = v.Interface() + } + + // Add in the user defined headers. + for k, v := range resp.Metadata { + k = strings.ToLower(k) + allMeta[userMetaPrefix+k] = v + } + + // Select the matching headers from the config. + metadata := common.MapStr{} + for _, key := range keys { + key = strings.ToLower(key) + + v, found := allMeta[key] + if !found { + continue + } + + metadata[key] = v + } + + return metadata +} diff --git a/x-pack/filebeat/input/awss3/s3_objects_test.go b/x-pack/filebeat/input/awss3/s3_objects_test.go new file mode 100644 index 000000000000..6cf1ea1fa5a8 --- /dev/null +++ b/x-pack/filebeat/input/awss3/s3_objects_test.go @@ -0,0 +1,261 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awss3 + +import ( + "bytes" + "context" + "errors" + "io/ioutil" + "path/filepath" + "strings" + "testing" + + awssdk "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" +) + +func newS3Object(t testing.TB, filename, contentType string) (s3EventV2, *s3.GetObjectResponse) { + data, err := ioutil.ReadFile(filename) + if err != nil { + t.Fatal(err) + } + + return newS3Event(filename), newS3GetObjectResponse(filename, data, contentType) +} + +func newS3GetObjectResponse(filename string, data []byte, contentType string) *s3.GetObjectResponse { + r := bytes.NewReader(data) + contentLen := int64(r.Len()) + resp := &s3.GetObjectResponse{ + GetObjectOutput: &s3.GetObjectOutput{ + Body: ioutil.NopCloser(r), + ContentLength: &contentLen, + ContentType: &contentType, + }, + } + switch strings.ToLower(filepath.Ext(filename)) { + case ".gz": + gzipEncoding := "gzip" + resp.ContentEncoding = &gzipEncoding + } + return resp +} + +func TestS3ObjectProcessor(t *testing.T) { + logp.TestingSetup() + + t.Run("download text/plain file", func(t *testing.T) { + testProcessS3Object(t, "testdata/log.txt", "text/plain", 2) + }) + + t.Run("multiline content", func(t *testing.T) { + sel := fileSelectorConfig{ReaderConfig: readerConfig{}} + sel.ReaderConfig.InitDefaults() + + // Unfortunately the config structs for the parser package are not + // exported to use config parsing. + cfg := common.MustNewConfigFrom(map[string]interface{}{ + "parsers": []map[string]interface{}{ + { + "multiline": map[string]interface{}{ + "pattern": "^ len(s3Objects) { + endIdx = len(s3Objects) + } + return &s3.ListObjectsOutput{ + Contents: s3Objects[startIdx:endIdx], + } + }) + mockS3Pager.EXPECT().Err().Return(nil) + + return mockS3Pager +} diff --git a/x-pack/filebeat/input/awss3/s3_test.go b/x-pack/filebeat/input/awss3/s3_test.go index 6cf1ea1fa5a8..a02f3a58495c 100644 --- a/x-pack/filebeat/input/awss3/s3_test.go +++ b/x-pack/filebeat/input/awss3/s3_test.go @@ -5,257 +5,266 @@ package awss3 import ( - "bytes" "context" - "errors" - "io/ioutil" - "path/filepath" - "strings" "testing" + "time" + + "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/beats/v7/libbeat/statestore/storetest" + + "github.com/aws/aws-sdk-go-v2/aws" - awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" ) -func newS3Object(t testing.TB, filename, contentType string) (s3EventV2, *s3.GetObjectResponse) { - data, err := ioutil.ReadFile(filename) - if err != nil { - t.Fatal(err) - } - - return newS3Event(filename), newS3GetObjectResponse(filename, data, contentType) -} - -func newS3GetObjectResponse(filename string, data []byte, contentType string) *s3.GetObjectResponse { - r := bytes.NewReader(data) - contentLen := int64(r.Len()) - resp := &s3.GetObjectResponse{ - GetObjectOutput: &s3.GetObjectOutput{ - Body: ioutil.NopCloser(r), - ContentLength: &contentLen, - ContentType: &contentType, - }, - } - switch strings.ToLower(filepath.Ext(filename)) { - case ".gz": - gzipEncoding := "gzip" - resp.ContentEncoding = &gzipEncoding - } - return resp -} - -func TestS3ObjectProcessor(t *testing.T) { +func TestS3Poller(t *testing.T) { logp.TestingSetup() + const bucket = "bucket" + const numberOfWorkers = 5 + const pollInterval = 2 * time.Second + const testTimeout = 1 * time.Second + + t.Run("Poll success", func(t *testing.T) { + storeReg := statestore.NewRegistry(storetest.NewMemoryStoreBackend()) + store, err := storeReg.Get("test") + if err != nil { + t.Fatalf("Failed to access store: %v", err) + } - t.Run("download text/plain file", func(t *testing.T) { - testProcessS3Object(t, "testdata/log.txt", "text/plain", 2) - }) - - t.Run("multiline content", func(t *testing.T) { - sel := fileSelectorConfig{ReaderConfig: readerConfig{}} - sel.ReaderConfig.InitDefaults() - - // Unfortunately the config structs for the parser package are not - // exported to use config parsing. - cfg := common.MustNewConfigFrom(map[string]interface{}{ - "parsers": []map[string]interface{}{ - { - "multiline": map[string]interface{}{ - "pattern": "^ len(s3Objects) { - endIdx = len(s3Objects) - } - return &s3.ListObjectsOutput{ - Contents: s3Objects[startIdx:endIdx], - } + s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, mockPublisher, nil) + receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, s3ObjProc, newStates(inputCtx), store, bucket, numberOfWorkers, pollInterval) + require.Error(t, context.DeadlineExceeded, receiver.Poll(ctx)) + assert.Equal(t, numberOfWorkers, receiver.workerSem.available) }) - mockS3Pager.EXPECT().Err().Return(nil) - - return mockS3Pager } diff --git a/x-pack/filebeat/input/awss3/state.go b/x-pack/filebeat/input/awss3/state.go new file mode 100644 index 000000000000..11b20652bfd1 --- /dev/null +++ b/x-pack/filebeat/input/awss3/state.go @@ -0,0 +1,75 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awss3 + +import ( + "fmt" + "time" +) + +// state is used to communicate the publishing state of a s3 object +type state struct { + // ID is used to identify the state in the store, and it is composed by + // Bucket + Key + Etag + LastModified.String(): changing this value or how it is + // composed will break backward compatibilities with entries already in the store. + ID string `json:"id" struct:"id"` + Bucket string `json:"bucket" struct:"bucket"` + Key string `json:"key" struct:"key"` + Etag string `json:"etag" struct:"etag"` + LastModified time.Time `json:"last_modified" struct:"last_modified"` + + // A state has Stored = true when all events are ACKed. + Stored bool `json:"stored" struct:"stored"` + // A state has Error = true when ProcessS3Object returned an error + Error bool `json:"error" struct:"error"` +} + +// newState creates a new s3 object state +func newState(bucket, key, etag string, lastModified time.Time) state { + s := state{ + Bucket: bucket, + Key: key, + LastModified: lastModified, + Etag: etag, + Stored: false, + Error: false, + } + + s.ID = s.Bucket + s.Key + s.Etag + s.LastModified.String() + + return s +} + +// MarkAsStored set the stored flag to true +func (s *state) MarkAsStored() { + s.Stored = true +} + +// MarkAsError set the error flag to true +func (s *state) MarkAsError() { + s.Error = true +} + +// IsEqual checks if the two states point to the same s3 object. +func (s *state) IsEqual(c *state) bool { + return s.Bucket == c.Bucket && s.Key == c.Key && s.Etag == c.Etag && s.LastModified.Equal(c.LastModified) +} + +// IsEmpty checks if the state is empty +func (s *state) IsEmpty() bool { + c := state{} + return s.Bucket == c.Bucket && s.Key == c.Key && s.Etag == c.Etag && s.LastModified.Equal(c.LastModified) +} + +// String returns string representation of the struct +func (s *state) String() string { + return fmt.Sprintf( + "{ID: %v, Bucket: %v, Key: %v, Etag: %v, LastModified: %v}", + s.ID, + s.Bucket, + s.Key, + s.Etag, + s.LastModified) +} diff --git a/x-pack/filebeat/input/awss3/state_test.go b/x-pack/filebeat/input/awss3/state_test.go new file mode 100644 index 000000000000..b304d74920fc --- /dev/null +++ b/x-pack/filebeat/input/awss3/state_test.go @@ -0,0 +1,152 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awss3 + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestStateIsEqual(t *testing.T) { + type stateTestCase struct { + states [2]state + isSame bool + } + + lastModifed := time.Now() + tests := map[string]stateTestCase{ + "two states pointing to the same key with same etag and same last modified not stored": { + [2]state{ + state{ + Bucket: "bucket a", + Key: "/key/to/this/file/1", + Etag: "etag", + LastModified: lastModifed, + }, + state{ + Bucket: "bucket a", + Key: "/key/to/this/file/1", + Etag: "etag", + LastModified: lastModifed, + }, + }, + true, + }, + "two states pointing to the same key with same etag and same last modified stored": { + [2]state{ + state{ + Bucket: "bucket a", + Key: "/key/to/this/file/1", + Etag: "etag", + LastModified: lastModifed, + Stored: true, + }, + state{ + Bucket: "bucket a", + Key: "/key/to/this/file/1", + Etag: "etag", + LastModified: lastModifed, + }, + }, + true, + }, + "two states pointing to the same key with same etag and same last modified error": { + [2]state{ + state{ + Bucket: "bucket a", + Key: "/key/to/this/file/1", + Etag: "etag", + LastModified: lastModifed, + Error: true, + }, + state{ + Bucket: "bucket a", + Key: "/key/to/this/file/1", + Etag: "etag", + LastModified: lastModifed, + }, + }, + true, + }, + "two states pointing to the same key with different etag and same last modified": { + [2]state{ + state{ + Bucket: "bucket a", + Key: "/key/to/this/file/1", + Etag: "etag1", + LastModified: lastModifed, + }, + state{ + Bucket: "bucket a", + Key: "/key/to/this/file/1", + Etag: "etag2", + LastModified: lastModifed, + }, + }, + false, + }, + "two states pointing to the same key with same etag and different last modified": { + [2]state{ + state{ + Bucket: "bucket a", + Key: "/key/to/this/file/1", + Etag: "etag", + LastModified: time.Now(), + }, + state{ + Bucket: "bucket a", + Key: "/key/to/this/file/1", + Etag: "etag", + LastModified: time.Now().Add(10 * time.Second), + }, + }, + false, + }, + "two states pointing to different key": { + [2]state{ + state{ + Bucket: "bucket a", + Key: "/key/to/this/file/1", + Etag: "etag", + LastModified: lastModifed, + }, + state{ + Bucket: "bucket a", + Key: "/key/to/this/file/2", + Etag: "etag", + LastModified: lastModifed, + }, + }, + false, + }, + "two states pointing to different bucket": { + [2]state{ + state{ + Bucket: "bucket b", + Key: "/key/to/this/file/1", + Etag: "etag", + LastModified: lastModifed, + }, + state{ + Bucket: "bucket a", + Key: "/key/to/this/file/1", + Etag: "etag", + LastModified: lastModifed, + }, + }, + false, + }, + } + + for name, test := range tests { + test := test + t.Run(name, func(t *testing.T) { + isSame := test.states[0].IsEqual(&test.states[1]) + assert.Equal(t, test.isSame, isSame) + }) + } +} diff --git a/x-pack/filebeat/input/awss3/states.go b/x-pack/filebeat/input/awss3/states.go new file mode 100644 index 000000000000..6674ee104c10 --- /dev/null +++ b/x-pack/filebeat/input/awss3/states.go @@ -0,0 +1,342 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awss3 + +import ( + "strings" + "sync" + + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + + "github.com/elastic/beats/v7/libbeat/logp" + + "github.com/elastic/beats/v7/libbeat/statestore" +) + +const ( + awsS3ObjectStatePrefix = "filebeat::aws-s3::state::" + awsS3WriteCommitPrefix = "filebeat::aws-s3::writeCommit::" +) + +type listingInfo struct { + totObjects int + storedObjects int + errorObjects int + finalCheck bool +} + +// states handles list of s3 object state. One must use newStates to instantiate a +// file states registry. Using the zero-value is not safe. +type states struct { + sync.RWMutex + + log *logp.Logger + + // states store + states []state + + // idx maps state IDs to state indexes for fast lookup and modifications. + idx map[string]int + + listingIDs map[string]struct{} + listingInfo *sync.Map + statesByListingID map[string][]state +} + +// newStates generates a new states registry. +func newStates(ctx v2.Context) *states { + return &states{ + log: ctx.Logger.Named("states"), + states: nil, + idx: map[string]int{}, + listingInfo: new(sync.Map), + listingIDs: map[string]struct{}{}, + statesByListingID: map[string][]state{}, + } +} + +func (s *states) MustSkip(state state, store *statestore.Store) bool { + if !s.IsNew(state) { + return true + } + + previousState := s.FindPrevious(state) + + // status is forgotten. if there is no previous state and + // the state.LastModified is before the last cleanStore + // write commit we can remove + var commitWriteState commitWriteState + err := store.Get(awsS3WriteCommitPrefix+state.Bucket, &commitWriteState) + if err == nil && previousState.IsEmpty() && + (state.LastModified.Before(commitWriteState.Time) || state.LastModified.Equal(commitWriteState.Time)) { + return true + } + + // we have no previous state or the previous state + // is not stored: refresh the state + if previousState.IsEmpty() || (!previousState.Stored && !previousState.Error) { + s.Update(state, "") + } + + return false +} + +func (s *states) Delete(id string) { + s.Lock() + defer s.Unlock() + + index := s.findPrevious(id) + if index >= 0 { + last := len(s.states) - 1 + s.states[last], s.states[index] = s.states[index], s.states[last] + s.states = s.states[:last] + + s.idx = map[string]int{} + for i, state := range s.states { + s.idx[state.ID] = i + } + } +} + +// IsListingFullyStored check if listing if fully stored +// After first time the condition is met it will always return false +func (s *states) IsListingFullyStored(listingID string) bool { + info, _ := s.listingInfo.Load(listingID) + listingInfo := info.(*listingInfo) + if listingInfo.finalCheck { + return false + } + + listingInfo.finalCheck = (listingInfo.storedObjects + listingInfo.errorObjects) == listingInfo.totObjects + return listingInfo.finalCheck +} + +// AddListing add listing info +func (s *states) AddListing(listingID string, listingInfo *listingInfo) { + s.Lock() + defer s.Unlock() + s.listingIDs[listingID] = struct{}{} + s.listingInfo.Store(listingID, listingInfo) +} + +// DeleteListing delete listing info +func (s *states) DeleteListing(listingID string) { + s.Lock() + defer s.Unlock() + delete(s.listingIDs, listingID) + delete(s.statesByListingID, listingID) + s.listingInfo.Delete(listingID) +} + +// Update updates a state. If previous state didn't exist, new one is created +func (s *states) Update(newState state, listingID string) { + s.Lock() + defer s.Unlock() + + id := newState.Bucket + newState.Key + index := s.findPrevious(id) + + if index >= 0 { + s.states[index] = newState + } else { + // No existing state found, add new one + s.idx[id] = len(s.states) + s.states = append(s.states, newState) + s.log.Debug("input", "New state added for %s", newState.ID) + } + + if listingID == "" || (!newState.Stored && !newState.Error) { + return + } + + // here we increase the number of stored object + info, _ := s.listingInfo.Load(listingID) + listingInfo := info.(*listingInfo) + if newState.Stored { + listingInfo.storedObjects++ + } + + if newState.Error { + listingInfo.errorObjects++ + } + + if _, ok := s.statesByListingID[listingID]; !ok { + s.statesByListingID[listingID] = make([]state, 0) + } + + s.statesByListingID[listingID] = append(s.statesByListingID[listingID], newState) +} + +// FindPrevious lookups a registered state, that matching the new state. +// Returns a zero-state if no match is found. +func (s *states) FindPrevious(newState state) state { + s.RLock() + defer s.RUnlock() + id := newState.Bucket + newState.Key + i := s.findPrevious(id) + if i < 0 { + return state{} + } + return s.states[i] +} + +// FindPreviousByID lookups a registered state, that matching the id. +// Returns a zero-state if no match is found. +func (s *states) FindPreviousByID(id string) state { + s.RLock() + defer s.RUnlock() + i := s.findPrevious(id) + if i < 0 { + return state{} + } + return s.states[i] +} + +func (s *states) IsNew(state state) bool { + s.RLock() + defer s.RUnlock() + id := state.Bucket + state.Key + i := s.findPrevious(id) + + if i < 0 { + return true + } + + return !s.states[i].IsEqual(&state) +} + +// findPrevious returns the previous state for the file. +// In case no previous state exists, index -1 is returned +func (s *states) findPrevious(id string) int { + if i, exists := s.idx[id]; exists { + return i + } + return -1 +} + +// GetStates creates copy of the file states. +func (s *states) GetStates() []state { + s.RLock() + defer s.RUnlock() + + newStates := make([]state, len(s.states)) + copy(newStates, s.states) + + return newStates +} + +// GetListingIDs return a of the listing IDs +func (s *states) GetListingIDs() []string { + s.RLock() + defer s.RUnlock() + listingIDs := make([]string, 0, len(s.listingIDs)) + for listingID := range s.listingIDs { + listingIDs = append(listingIDs, listingID) + } + + return listingIDs +} + +// GetStatesByListingID return a copy of the states by listing ID +func (s *states) GetStatesByListingID(listingID string) []state { + s.RLock() + defer s.RUnlock() + + if _, ok := s.statesByListingID[listingID]; !ok { + return nil + } + + newStates := make([]state, len(s.statesByListingID[listingID])) + copy(newStates, s.statesByListingID[listingID]) + return newStates +} + +func (s *states) readStatesFrom(store *statestore.Store) error { + var states []state + + err := store.Each(func(key string, dec statestore.ValueDecoder) (bool, error) { + if !strings.HasPrefix(key, awsS3ObjectStatePrefix) { + return true, nil + } + + // try to decode. Ignore faulty/incompatible values. + var st state + if err := dec.Decode(&st); err != nil { + // XXX: Do we want to log here? In case we start to store other + // state types in the registry, then this operation will likely fail + // quite often, producing some false-positives in the logs... + return true, nil + } + + st.ID = key[len(awsS3ObjectStatePrefix):] + states = append(states, st) + return true, nil + }) + + if err != nil { + return err + } + + states = fixStates(states) + + for _, state := range states { + s.Update(state, "") + } + + return nil +} + +// fixStates cleans up the registry states when updating from an older version +// of filebeat potentially writing invalid entries. +func fixStates(states []state) []state { + if len(states) == 0 { + return states + } + + // we use a map of states here, so to identify and merge duplicate entries. + idx := map[string]*state{} + for i := range states { + state := &states[i] + + old, exists := idx[state.ID] + if !exists { + idx[state.ID] = state + } else { + mergeStates(old, state) // overwrite the entry in 'old' + } + } + + if len(idx) == len(states) { + return states + } + + i := 0 + newStates := make([]state, len(idx)) + for _, state := range idx { + newStates[i] = *state + i++ + } + return newStates +} + +// mergeStates merges 2 states by trying to determine the 'newer' state. +// The st state is overwritten with the updated fields. +func mergeStates(st, other *state) { + // update file meta-data. As these are updated concurrently by the + // inputs, select the newer state based on the update timestamp. + if st.LastModified.Before(other.LastModified) { + st.LastModified = other.LastModified + } +} + +func (s *states) writeStates(store *statestore.Store) error { + for _, state := range s.GetStates() { + key := awsS3ObjectStatePrefix + state.ID + if err := store.Set(key, state); err != nil { + return err + } + } + return nil +} diff --git a/x-pack/filebeat/input/awss3/states_test.go b/x-pack/filebeat/input/awss3/states_test.go new file mode 100644 index 000000000000..c99e7c0889b3 --- /dev/null +++ b/x-pack/filebeat/input/awss3/states_test.go @@ -0,0 +1,153 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awss3 + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/logp" +) + +var inputCtx = v2.Context{ + Logger: logp.NewLogger("test"), + Cancelation: context.Background(), +} + +func TestStatesDelete(t *testing.T) { + type stateTestCase struct { + states func() *states + deleteID string + expected []state + } + + lastModified := time.Date(2021, time.July, 22, 18, 38, 00, 0, time.UTC) + tests := map[string]stateTestCase{ + "delete empty states": { + states: func() *states { + return newStates(inputCtx) + }, + deleteID: "an id", + expected: []state{}, + }, + "delete not existing state": { + states: func() *states { + states := newStates(inputCtx) + states.Update(newState("bucket", "key", "etag", lastModified), "") + return states + }, + deleteID: "an id", + expected: []state{ + { + ID: "bucketkeyetag" + lastModified.String(), + Bucket: "bucket", + Key: "key", + Etag: "etag", + LastModified: lastModified, + }, + }, + }, + "delete only one existing": { + states: func() *states { + states := newStates(inputCtx) + states.Update(newState("bucket", "key", "etag", lastModified), "") + return states + }, + deleteID: "bucketkey", + expected: []state{}, + }, + "delete first": { + states: func() *states { + states := newStates(inputCtx) + states.Update(newState("bucket", "key1", "etag1", lastModified), "") + states.Update(newState("bucket", "key2", "etag2", lastModified), "") + states.Update(newState("bucket", "key3", "etag3", lastModified), "") + return states + }, + deleteID: "bucketkey1", + expected: []state{ + { + ID: "bucketkey3etag3" + lastModified.String(), + Bucket: "bucket", + Key: "key3", + Etag: "etag3", + LastModified: lastModified, + }, + { + ID: "bucketkey2etag2" + lastModified.String(), + Bucket: "bucket", + Key: "key2", + Etag: "etag2", + LastModified: lastModified, + }, + }, + }, + "delete last": { + states: func() *states { + states := newStates(inputCtx) + states.Update(newState("bucket", "key1", "etag1", lastModified), "") + states.Update(newState("bucket", "key2", "etag2", lastModified), "") + states.Update(newState("bucket", "key3", "etag3", lastModified), "") + return states + }, + deleteID: "bucketkey3", + expected: []state{ + { + ID: "bucketkey1etag1" + lastModified.String(), + Bucket: "bucket", + Key: "key1", + Etag: "etag1", + LastModified: lastModified, + }, + { + ID: "bucketkey2etag2" + lastModified.String(), + Bucket: "bucket", + Key: "key2", + Etag: "etag2", + LastModified: lastModified, + }, + }, + }, + "delete any": { + states: func() *states { + states := newStates(inputCtx) + states.Update(newState("bucket", "key1", "etag1", lastModified), "") + states.Update(newState("bucket", "key2", "etag2", lastModified), "") + states.Update(newState("bucket", "key3", "etag3", lastModified), "") + return states + }, + deleteID: "bucketkey2", + expected: []state{ + { + ID: "bucketkey1etag1" + lastModified.String(), + Bucket: "bucket", + Key: "key1", + Etag: "etag1", + LastModified: lastModified, + }, + { + ID: "bucketkey3etag3" + lastModified.String(), + Bucket: "bucket", + Key: "key3", + Etag: "etag3", + LastModified: lastModified, + }, + }, + }, + } + + for name, test := range tests { + test := test + t.Run(name, func(t *testing.T) { + states := test.states() + states.Delete(test.deleteID) + assert.Equal(t, test.expected, states.GetStates()) + }) + } +} diff --git a/x-pack/filebeat/input/default-inputs/inputs.go b/x-pack/filebeat/input/default-inputs/inputs.go index a3381cb42d0b..7fc3737e37da 100644 --- a/x-pack/filebeat/input/default-inputs/inputs.go +++ b/x-pack/filebeat/input/default-inputs/inputs.go @@ -30,6 +30,6 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2 http_endpoint.Plugin(), httpjson.Plugin(log, store), o365audit.Plugin(log, store), - awss3.Plugin(), + awss3.Plugin(store), } } diff --git a/x-pack/filebeat/module/aws/_meta/docs.asciidoc b/x-pack/filebeat/module/aws/_meta/docs.asciidoc index 4cd9482486a1..2e90084b1f05 100644 --- a/x-pack/filebeat/module/aws/_meta/docs.asciidoc +++ b/x-pack/filebeat/module/aws/_meta/docs.asciidoc @@ -10,9 +10,14 @@ beta[] This is a module for aws logs. It uses filebeat s3 input to get log files from -AWS S3 buckets with SQS notification. This module supports reading s3 server -access logs with `s3access` fileset, ELB access logs with `elb` fileset, VPC -flow logs with `vpcflow` fileset, and CloudTrail logs with `cloudtrail` fileset. +AWS S3 buckets with SQS notification or directly polling list of S3 objects in an S3 bucket. +The use of SQS notification is preferred: polling list of S3 objects is expensive +in terms of performance and costs, and cannot scale horizontally without ingestion duplication, +and should be preferably used only when no SQS notification can be attached to the S3 buckets. + +This module supports reading S3 server access logs with `s3access` fileset, +ELB access logs with `elb` fileset, VPC flow logs with `vpcflow` fileset, +and CloudTrail logs with `cloudtrail` fileset. Access logs contain detailed information about the requests made to these services. VPC flow logs captures information about the IP traffic going to and @@ -39,6 +44,9 @@ Example config: cloudtrail: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue + #var.bucket: 'arn:aws:s3:::mybucket' + #var.bucket_list_interval: 300s + #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials #var.credential_profile_name: fb-aws #var.access_key_id: access_key_id @@ -53,6 +61,9 @@ Example config: cloudwatch: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue + #var.bucket: 'arn:aws:s3:::mybucket' + #var.bucket_list_interval: 300s + #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials #var.credential_profile_name: fb-aws #var.access_key_id: access_key_id @@ -67,6 +78,9 @@ Example config: ec2: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue + #var.bucket: 'arn:aws:s3:::mybucket' + #var.bucket_list_interval: 300s + #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials #var.credential_profile_name: fb-aws #var.access_key_id: access_key_id @@ -81,6 +95,9 @@ Example config: elb: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue + #var.bucket: 'arn:aws:s3:::mybucket' + #var.bucket_list_interval: 300s + #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials #var.credential_profile_name: fb-aws #var.access_key_id: access_key_id @@ -95,6 +112,9 @@ Example config: s3access: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue + #var.bucket: 'arn:aws:s3:::mybucket' + #var.bucket_list_interval: 300s + #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials #var.credential_profile_name: fb-aws #var.access_key_id: access_key_id @@ -109,6 +129,9 @@ Example config: vpcflow: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue + #var.bucket: 'arn:aws:s3:::mybucket' + #var.bucket_list_interval: 300s + #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials #var.credential_profile_name: fb-aws #var.access_key_id: access_key_id @@ -123,7 +146,7 @@ Example config: *`var.queue_url`*:: -(Required) AWS SQS queue url. +AWS SQS queue url (Required when `var.bucket` is not set). *`var.visibility_timeout`*:: @@ -134,6 +157,19 @@ Default to be 300 seconds. Maximum duration before AWS API request will be interrupted. Default to be 120 seconds. +*`var.bucket`*:: + +AWS S3 bucket ARN (Required when `var.queue_url` is not set). + +*`var.number_of_workers`*:: + +Number of workers that will process the S3 objects listed (Required when `var.bucket` is set). +Use to vertically scale the input. + +*`var.bucket_list_interval`*:: + +Wait interval between completion of a list request to the S3 bucket and beginning of the next one. Default to be 120 seconds. + *`var.endpoint`*:: Custom endpoint used to access AWS APIs. diff --git a/x-pack/filebeat/module/aws/cloudtrail/config/aws-s3.yml b/x-pack/filebeat/module/aws/cloudtrail/config/aws-s3.yml index 4daf262994bf..78f068225906 100644 --- a/x-pack/filebeat/module/aws/cloudtrail/config/aws-s3.yml +++ b/x-pack/filebeat/module/aws/cloudtrail/config/aws-s3.yml @@ -1,5 +1,18 @@ type: aws-s3 +{{ if .queue_url }} queue_url: {{ .queue_url }} +{{ end }} +{{ if .bucket }} +bucket: {{ .bucket }} +{{ end }} + +{{ if .number_of_workers }} +number_of_workers: {{ .number_of_workers }} +{{ end }} + +{{ if .bucket_list_interval }} +bucket_list_interval: {{ .bucket_list_interval }} +{{ end }} file_selectors: {{ if .process_cloudtrail_logs }} - regex: '/CloudTrail/' diff --git a/x-pack/filebeat/module/aws/cloudtrail/manifest.yml b/x-pack/filebeat/module/aws/cloudtrail/manifest.yml index 1903ee34f251..9d40124d846a 100644 --- a/x-pack/filebeat/module/aws/cloudtrail/manifest.yml +++ b/x-pack/filebeat/module/aws/cloudtrail/manifest.yml @@ -4,6 +4,9 @@ var: - name: input default: aws-s3 - name: queue_url + - name: bucket + - name: number_of_workers + - name: bucket_list_interval - name: shared_credential_file - name: credential_profile_name - name: visibility_timeout diff --git a/x-pack/filebeat/module/aws/cloudwatch/config/aws-s3.yml b/x-pack/filebeat/module/aws/cloudwatch/config/aws-s3.yml index b0fb5feed0c5..e960abcffe5f 100644 --- a/x-pack/filebeat/module/aws/cloudwatch/config/aws-s3.yml +++ b/x-pack/filebeat/module/aws/cloudwatch/config/aws-s3.yml @@ -1,5 +1,18 @@ type: aws-s3 +{{ if .queue_url }} queue_url: {{ .queue_url }} +{{ end }} +{{ if .bucket }} +bucket: {{ .bucket }} +{{ end }} + +{{ if .number_of_workers }} +number_of_workers: {{ .number_of_workers }} +{{ end }} + +{{ if .bucket_list_interval }} +bucket_list_interval: {{ .bucket_list_interval }} +{{ end }} {{ if .credential_profile_name }} credential_profile_name: {{ .credential_profile_name }} diff --git a/x-pack/filebeat/module/aws/cloudwatch/manifest.yml b/x-pack/filebeat/module/aws/cloudwatch/manifest.yml index 84f672107c63..415d3f42718b 100644 --- a/x-pack/filebeat/module/aws/cloudwatch/manifest.yml +++ b/x-pack/filebeat/module/aws/cloudwatch/manifest.yml @@ -4,6 +4,9 @@ var: - name: input default: aws-s3 - name: queue_url + - name: bucket + - name: number_of_workers + - name: bucket_list_interval - name: shared_credential_file - name: credential_profile_name - name: visibility_timeout diff --git a/x-pack/filebeat/module/aws/ec2/config/aws-s3.yml b/x-pack/filebeat/module/aws/ec2/config/aws-s3.yml index b0fb5feed0c5..e960abcffe5f 100644 --- a/x-pack/filebeat/module/aws/ec2/config/aws-s3.yml +++ b/x-pack/filebeat/module/aws/ec2/config/aws-s3.yml @@ -1,5 +1,18 @@ type: aws-s3 +{{ if .queue_url }} queue_url: {{ .queue_url }} +{{ end }} +{{ if .bucket }} +bucket: {{ .bucket }} +{{ end }} + +{{ if .number_of_workers }} +number_of_workers: {{ .number_of_workers }} +{{ end }} + +{{ if .bucket_list_interval }} +bucket_list_interval: {{ .bucket_list_interval }} +{{ end }} {{ if .credential_profile_name }} credential_profile_name: {{ .credential_profile_name }} diff --git a/x-pack/filebeat/module/aws/ec2/manifest.yml b/x-pack/filebeat/module/aws/ec2/manifest.yml index 84f672107c63..415d3f42718b 100644 --- a/x-pack/filebeat/module/aws/ec2/manifest.yml +++ b/x-pack/filebeat/module/aws/ec2/manifest.yml @@ -4,6 +4,9 @@ var: - name: input default: aws-s3 - name: queue_url + - name: bucket + - name: number_of_workers + - name: bucket_list_interval - name: shared_credential_file - name: credential_profile_name - name: visibility_timeout diff --git a/x-pack/filebeat/module/aws/elb/config/aws-s3.yml b/x-pack/filebeat/module/aws/elb/config/aws-s3.yml index b0fb5feed0c5..e960abcffe5f 100644 --- a/x-pack/filebeat/module/aws/elb/config/aws-s3.yml +++ b/x-pack/filebeat/module/aws/elb/config/aws-s3.yml @@ -1,5 +1,18 @@ type: aws-s3 +{{ if .queue_url }} queue_url: {{ .queue_url }} +{{ end }} +{{ if .bucket }} +bucket: {{ .bucket }} +{{ end }} + +{{ if .number_of_workers }} +number_of_workers: {{ .number_of_workers }} +{{ end }} + +{{ if .bucket_list_interval }} +bucket_list_interval: {{ .bucket_list_interval }} +{{ end }} {{ if .credential_profile_name }} credential_profile_name: {{ .credential_profile_name }} diff --git a/x-pack/filebeat/module/aws/elb/manifest.yml b/x-pack/filebeat/module/aws/elb/manifest.yml index 735591632349..128fcbf735e7 100644 --- a/x-pack/filebeat/module/aws/elb/manifest.yml +++ b/x-pack/filebeat/module/aws/elb/manifest.yml @@ -4,6 +4,9 @@ var: - name: input default: aws-s3 - name: queue_url + - name: bucket + - name: number_of_workers + - name: bucket_list_interval - name: shared_credential_file - name: credential_profile_name - name: visibility_timeout diff --git a/x-pack/filebeat/module/aws/s3access/config/aws-s3.yml b/x-pack/filebeat/module/aws/s3access/config/aws-s3.yml index b0fb5feed0c5..e960abcffe5f 100644 --- a/x-pack/filebeat/module/aws/s3access/config/aws-s3.yml +++ b/x-pack/filebeat/module/aws/s3access/config/aws-s3.yml @@ -1,5 +1,18 @@ type: aws-s3 +{{ if .queue_url }} queue_url: {{ .queue_url }} +{{ end }} +{{ if .bucket }} +bucket: {{ .bucket }} +{{ end }} + +{{ if .number_of_workers }} +number_of_workers: {{ .number_of_workers }} +{{ end }} + +{{ if .bucket_list_interval }} +bucket_list_interval: {{ .bucket_list_interval }} +{{ end }} {{ if .credential_profile_name }} credential_profile_name: {{ .credential_profile_name }} diff --git a/x-pack/filebeat/module/aws/s3access/manifest.yml b/x-pack/filebeat/module/aws/s3access/manifest.yml index 84f672107c63..415d3f42718b 100644 --- a/x-pack/filebeat/module/aws/s3access/manifest.yml +++ b/x-pack/filebeat/module/aws/s3access/manifest.yml @@ -4,6 +4,9 @@ var: - name: input default: aws-s3 - name: queue_url + - name: bucket + - name: number_of_workers + - name: bucket_list_interval - name: shared_credential_file - name: credential_profile_name - name: visibility_timeout diff --git a/x-pack/filebeat/module/aws/vpcflow/config/input.yml b/x-pack/filebeat/module/aws/vpcflow/config/input.yml index 8fb86aee8725..f11ffcbc1e0f 100644 --- a/x-pack/filebeat/module/aws/vpcflow/config/input.yml +++ b/x-pack/filebeat/module/aws/vpcflow/config/input.yml @@ -1,7 +1,20 @@ {{ if eq .input "aws-s3" }} type: aws-s3 +{{ if .queue_url }} queue_url: {{ .queue_url }} +{{ end }} +{{ if .bucket }} +bucket: {{ .bucket }} +{{ end }} + +{{ if .number_of_workers }} +number_of_workers: {{ .number_of_workers }} +{{ end }} + +{{ if .bucket_list_interval }} +bucket_list_interval: {{ .bucket_list_interval }} +{{ end }} {{ if .credential_profile_name }} credential_profile_name: {{ .credential_profile_name }} diff --git a/x-pack/filebeat/module/aws/vpcflow/manifest.yml b/x-pack/filebeat/module/aws/vpcflow/manifest.yml index 0c2ec0f7e1b4..d3122493b8cd 100644 --- a/x-pack/filebeat/module/aws/vpcflow/manifest.yml +++ b/x-pack/filebeat/module/aws/vpcflow/manifest.yml @@ -4,6 +4,9 @@ var: - name: input default: aws-s3 - name: queue_url + - name: bucket + - name: number_of_workers + - name: bucket_list_interval - name: shared_credential_file - name: credential_profile_name - name: visibility_timeout diff --git a/x-pack/libbeat/common/aws/credentials.go b/x-pack/libbeat/common/aws/credentials.go index 662da9d570a6..4ca498e1aa52 100644 --- a/x-pack/libbeat/common/aws/credentials.go +++ b/x-pack/libbeat/common/aws/credentials.go @@ -76,8 +76,10 @@ func getAccessKeys(config ConfigAWS) awssdk.Config { Value: awsCredentials, } - // Set default region to make initial aws api call - awsConfig.Region = "us-east-1" + // Set default region if empty to make initial aws api call + if awsConfig.Region == "" { + awsConfig.Region = "us-east-1" + } // Assume IAM role if iam_role config parameter is given if config.RoleArn != "" { @@ -112,8 +114,10 @@ func getSharedCredentialProfile(config ConfigAWS) (awssdk.Config, error) { return awsConfig, errors.Wrap(err, "external.LoadDefaultAWSConfig failed with shared credential profile given") } - // Set default region to make initial aws api call - awsConfig.Region = "us-east-1" + // Set default region if empty to make initial aws api call + if awsConfig.Region == "" { + awsConfig.Region = "us-east-1" + } // Assume IAM role if iam_role config parameter is given if config.RoleArn != "" {