Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add vars in modules.d/aws.yml.disabled #27454

Merged
merged 4 commits into from
Aug 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions filebeat/docs/modules/aws.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Example config:
cloudtrail:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket: 'arn:aws:s3:::mybucket'
#var.bucket_arn: 'arn:aws:s3:::mybucket'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
Expand All @@ -66,7 +66,7 @@ Example config:
cloudwatch:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket: 'arn:aws:s3:::mybucket'
#var.bucket_arn: 'arn:aws:s3:::mybucket'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
Expand All @@ -83,7 +83,7 @@ Example config:
ec2:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket: 'arn:aws:s3:::mybucket'
#var.bucket_arn: 'arn:aws:s3:::mybucket'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
Expand All @@ -100,7 +100,7 @@ Example config:
elb:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket: 'arn:aws:s3:::mybucket'
#var.bucket_arn: 'arn:aws:s3:::mybucket'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
Expand All @@ -117,7 +117,7 @@ Example config:
s3access:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket: 'arn:aws:s3:::mybucket'
#var.bucket_arn: 'arn:aws:s3:::mybucket'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
Expand All @@ -134,7 +134,7 @@ Example config:
vpcflow:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket: 'arn:aws:s3:::mybucket'
#var.bucket_arn: 'arn:aws:s3:::mybucket'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
Expand Down
54 changes: 54 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,15 @@ filebeat.modules:
# AWS SQS queue url
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue

# AWS S3 bucket arn
#var.bucket_arn: 'arn:aws:s3:::mybucket'

# Bucket list interval on S3 bucket
#var.bucket_list_interval: 300s

# Number of workers on S3 bucket
#var.number_of_workers: 5

# Process CloudTrail logs
# default is true, set to false to skip Cloudtrail logs
# var.process_cloudtrail_logs: false
Expand Down Expand Up @@ -154,6 +163,15 @@ filebeat.modules:
# AWS SQS queue url
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue

# AWS S3 bucket arn
#var.bucket_arn: 'arn:aws:s3:::mybucket'

# Bucket list interval on S3 bucket
#var.bucket_list_interval: 300s

# Number of workers on S3 bucket
#var.number_of_workers: 5

# Filename of AWS credential file
# If not set "$HOME/.aws/credentials" is used on Linux/Mac
# "%UserProfile%\.aws\credentials" is used on Windows
Expand Down Expand Up @@ -194,6 +212,15 @@ filebeat.modules:
# AWS SQS queue url
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue

# AWS S3 bucket arn
#var.bucket_arn: 'arn:aws:s3:::mybucket'

# Bucket list interval on S3 bucket
#var.bucket_list_interval: 300s

# Number of workers on S3 bucket
#var.number_of_workers: 5

# Filename of AWS credential file
# If not set "$HOME/.aws/credentials" is used on Linux/Mac
# "%UserProfile%\.aws\credentials" is used on Windows
Expand Down Expand Up @@ -234,6 +261,15 @@ filebeat.modules:
# AWS SQS queue url
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue

# AWS S3 bucket arn
#var.bucket_arn: 'arn:aws:s3:::mybucket'

# Bucket list interval on S3 bucket
#var.bucket_list_interval: 300s

# Number of workers on S3 bucket
#var.number_of_workers: 5

# Filename of AWS credential file
# If not set "$HOME/.aws/credentials" is used on Linux/Mac
# "%UserProfile%\.aws\credentials" is used on Windows
Expand Down Expand Up @@ -274,6 +310,15 @@ filebeat.modules:
# AWS SQS queue url
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue

# AWS S3 bucket arn
#var.bucket_arn: 'arn:aws:s3:::mybucket'

# Bucket list interval on S3 bucket
#var.bucket_list_interval: 300s

# Number of workers on S3 bucket
#var.number_of_workers: 5

# Filename of AWS credential file
# If not set "$HOME/.aws/credentials" is used on Linux/Mac
# "%UserProfile%\.aws\credentials" is used on Windows
Expand Down Expand Up @@ -314,6 +359,15 @@ filebeat.modules:
# AWS SQS queue url
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue

# AWS S3 bucket arn
#var.bucket_arn: 'arn:aws:s3:::mybucket'

# Bucket list interval on S3 bucket
#var.bucket_list_interval: 300s

# Number of workers on S3 bucket
#var.number_of_workers: 5

# Filename of AWS credential file
# If not set "$HOME/.aws/credentials" is used on Linux/Mac
# "%UserProfile%\.aws\credentials" is used on Windows
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (a *eventACKTracker) ACK() {

// Wait waits for the number of pending ACKs to be zero.
// Wait must be called sequentially only after every expected
// Add call are made. Failing to do so could reset the pendingACKs
// `Add` calls are made. Failing to do so could reset the pendingACKs
// property to 0 and would results in Wait returning after additional
// calls to `Add` are made without a corresponding `ACK` call.
func (a *eventACKTracker) Wait() {
Expand Down
16 changes: 8 additions & 8 deletions x-pack/filebeat/input/awss3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type config struct {
FIPSEnabled bool `config:"fips_enabled"`
MaxNumberOfMessages int `config:"max_number_of_messages"`
QueueURL string `config:"queue_url"`
Bucket string `config:"bucket"`
BucketARN string `config:"bucket_arn"`
BucketListInterval time.Duration `config:"bucket_list_interval"`
NumberOfWorkers int `config:"number_of_workers"`
AWSConfig awscommon.ConfigAWS `config:",inline"`
Expand All @@ -49,20 +49,20 @@ func defaultConfig() config {
}

func (c *config) Validate() error {
if c.QueueURL == "" && c.Bucket == "" {
return fmt.Errorf("queue_url or bucket must provided")
if c.QueueURL == "" && c.BucketARN == "" {
return fmt.Errorf("queue_url or bucket_arn must provided")
}

if c.QueueURL != "" && c.Bucket != "" {
return fmt.Errorf("queue_url <%v> and bucket <%v> "+
"cannot be set at the same time", c.QueueURL, c.Bucket)
if c.QueueURL != "" && c.BucketARN != "" {
return fmt.Errorf("queue_url <%v> and bucket_arn <%v> "+
"cannot be set at the same time", c.QueueURL, c.BucketARN)
}

if c.Bucket != "" && c.BucketListInterval <= 0 {
if c.BucketARN != "" && c.BucketListInterval <= 0 {
return fmt.Errorf("bucket_list_interval <%v> must be greater than 0", c.BucketListInterval)
}

if c.Bucket != "" && c.NumberOfWorkers <= 0 {
if c.BucketARN != "" && c.NumberOfWorkers <= 0 {
return fmt.Errorf("number_of_workers <%v> must be greater than 0", c.NumberOfWorkers)
}

Expand Down
22 changes: 11 additions & 11 deletions x-pack/filebeat/input/awss3/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestConfig(t *testing.T) {
require.NoError(t, parserConf.Unpack(common.MustNewConfigFrom("")))
return config{
QueueURL: quequeURL,
Bucket: s3Bucket,
BucketARN: s3Bucket,
APITimeout: 120 * time.Second,
VisibilityTimeout: 300 * time.Second,
SQSMaxReceiveCount: 5,
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestConfig(t *testing.T) {
"",
s3Bucket,
common.MapStr{
"bucket": s3Bucket,
"bucket_arn": s3Bucket,
"number_of_workers": 5,
},
"",
Expand Down Expand Up @@ -109,21 +109,21 @@ func TestConfig(t *testing.T) {
"",
"",
common.MapStr{
"queue_url": "",
"bucket": "",
"queue_url": "",
"bucket_arn": "",
},
"queue_url or bucket must provided",
"queue_url or bucket_arn must provided",
nil,
},
{
"error on both queueURL and s3Bucket",
queueURL,
s3Bucket,
common.MapStr{
"queue_url": queueURL,
"bucket": s3Bucket,
"queue_url": queueURL,
"bucket_arn": s3Bucket,
},
"queue_url <https://example.com> and bucket <arn:aws:s3:::aBucket> cannot be set at the same time",
"queue_url <https://example.com> and bucket_arn <arn:aws:s3:::aBucket> cannot be set at the same time",
nil,
},
{
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestConfig(t *testing.T) {
"",
s3Bucket,
common.MapStr{
"bucket": s3Bucket,
"bucket_arn": s3Bucket,
"bucket_list_interval": "0",
},
"bucket_list_interval <0s> must be greater than 0",
Expand All @@ -175,7 +175,7 @@ func TestConfig(t *testing.T) {
"",
s3Bucket,
common.MapStr{
"bucket": s3Bucket,
"bucket_arn": s3Bucket,
"number_of_workers": "0",
},
"number_of_workers <0> must be greater than 0",
Expand Down Expand Up @@ -231,7 +231,7 @@ func TestConfig(t *testing.T) {
"",
s3Bucket,
common.MapStr{
"bucket": s3Bucket,
"bucket_arn": s3Bucket,
"expand_event_list_from_field": "Records",
"content_type": "text/plain",
},
Expand Down
7 changes: 4 additions & 3 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
}
}

if in.config.Bucket != "" {
if in.config.BucketARN != "" {
// Create S3 receiver and S3 notification processor.
poller, err := in.createS3Lister(inputContext, client, persistentStore, states)
if err != nil {
Expand Down Expand Up @@ -203,7 +203,7 @@ func (in *s3Input) createS3Lister(ctx v2.Context, client beat.Client, persistent
client: s3.New(awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, s3ServiceName, in.awsConfig.Region, in.awsConfig)),
}

log := ctx.Logger.With("s3_bucket", in.config.Bucket)
log := ctx.Logger.With("bucket_arn", in.config.BucketARN)
log.Infof("number_of_workers is set to %v.", in.config.NumberOfWorkers)
log.Infof("bucket_list_interval is set to %v.", in.config.BucketListInterval)
log.Infof("AWS region is set to %v.", in.awsConfig.Region)
Expand All @@ -223,7 +223,8 @@ func (in *s3Input) createS3Lister(ctx v2.Context, client beat.Client, persistent
s3EventHandlerFactory,
states,
persistentStore,
in.config.Bucket,
in.config.BucketARN,
in.awsConfig.Region,
in.config.NumberOfWorkers,
in.config.BucketListInterval)

Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult
}

s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, conf.FileSelectors)
s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", numberOfWorkers, time.Second)
s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", "region", numberOfWorkers, time.Second)

ctx, cancel := context.WithCancel(context.Background())
b.Cleanup(cancel)
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func getTerraformOutputs(t *testing.T) terraformOutputData {

func makeTestConfigS3(s3bucket string) *common.Config {
return common.MustNewConfigFrom(fmt.Sprintf(`---
bucket: aws:s3:::%s
bucket_arn: aws:s3:::%s
number_of_workers: 1
file_selectors:
-
Expand Down
5 changes: 5 additions & 0 deletions x-pack/filebeat/input/awss3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type s3ObjectPayload struct {
type s3Poller struct {
numberOfWorkers int
bucket string
region string
bucketPollInterval time.Duration
workerSem *sem
s3 s3API
Expand All @@ -60,6 +61,7 @@ func newS3Poller(log *logp.Logger,
states *states,
store *statestore.Store,
bucket string,
awsRegion string,
numberOfWorkers int,
bucketPollInterval time.Duration) *s3Poller {
if metrics == nil {
Expand All @@ -68,6 +70,7 @@ func newS3Poller(log *logp.Logger,
return &s3Poller{
numberOfWorkers: numberOfWorkers,
bucket: bucket,
region: awsRegion,
bucketPollInterval: bucketPollInterval,
workerSem: newSem(numberOfWorkers),
s3: s3,
Expand Down Expand Up @@ -178,7 +181,9 @@ func (p *s3Poller) GetS3Objects(ctx context.Context, s3ObjectPayloadChan chan<-
p.states.Update(state, "")

event := s3EventV2{}
event.AWSRegion = p.region
event.S3.Bucket.Name = bucketName
event.S3.Bucket.ARN = p.bucket
event.S3.Object.Key = filename

acker := newEventACKTracker(ctx)
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/awss3/s3_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func (f *s3ObjectProcessorFactory) findReaderConfig(key string) *readerConfig {
// match the S3 object key.
func (f *s3ObjectProcessorFactory) Create(ctx context.Context, log *logp.Logger, ack *eventACKTracker, obj s3EventV2) s3ObjectHandler {
log = log.With(
"s3_bucket", obj.S3.Bucket.Name,
"s3_object", obj.S3.Object.Key)
"bucket_arn", obj.S3.Bucket.Name,
"object_key", obj.S3.Object.Key)

readerConfig := f.findReaderConfig(obj.S3.Object.Key)
if readerConfig == nil {
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/awss3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func TestS3Poller(t *testing.T) {
Return(nil, errFakeConnectivityFailure)

s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, mockPublisher, nil)
receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, s3ObjProc, newStates(inputCtx), store, bucket, numberOfWorkers, pollInterval)
receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, s3ObjProc, newStates(inputCtx), store, bucket, "region", numberOfWorkers, pollInterval)
require.Error(t, context.DeadlineExceeded, receiver.Poll(ctx))
assert.Equal(t, numberOfWorkers, receiver.workerSem.available)
})
Expand Down Expand Up @@ -263,7 +263,7 @@ func TestS3Poller(t *testing.T) {
Return(nil, errFakeConnectivityFailure)

s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, mockPublisher, nil)
receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, s3ObjProc, newStates(inputCtx), store, bucket, numberOfWorkers, pollInterval)
receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, s3ObjProc, newStates(inputCtx), store, bucket, "region", numberOfWorkers, pollInterval)
require.Error(t, context.DeadlineExceeded, receiver.Poll(ctx))
assert.Equal(t, numberOfWorkers, receiver.workerSem.available)
})
Expand Down
Loading