diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c61d7fba8476..b86f3ce5ec99 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -51,6 +51,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - cisco/asa fileset: Fix parsing of 302021 message code. {pull}14519[14519] - Fix filebeat azure dashboards, event category should be `Alert`. {pull}14668[14668] +- Fix s3 input hanging with GetObjectRequest API call by adding context_timeout config. {issue}15502[15502] {pull}15590[15590] - Fix typos in zeek notice fileset config file. {issue}15764[15764] {pull}15765[15765] - Add shared_credential_file to cloudtrail config {issue}15652[15652] {pull}15656[15656] diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 2a70247f23a0..b7f33a5ceb97 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -45,13 +45,21 @@ URL of the AWS SQS queue that messages will be received from. Required. [float] ==== `visibility_timeout` -The duration (in seconds) that the received messages are hidden from subsequent +The duration that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request. -This value needs to be a lot bigger than filebeat collection frequency so +This value needs to be a lot bigger than {beatname_uc} collection frequency so if it took too long to read the s3 log, this sqs message will not be reprocessed. The default visibility timeout for a message is 300 seconds. The minimum is 0 seconds. The maximum is 12 hours. +[float] +==== `api_timeout` + +The maximum duration of AWS API can take. If it exceeds the timeout, AWS API +will be interrupted. +The default AWS API timeout for a message is 120 seconds. The minimum +is 0 seconds. The maximum is half of the visibility timeout value. + [float] ==== `aws credentials` diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index cae4c88ee693..27564caea7c7 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -93,6 +93,14 @@ filebeat.modules: # If not set the default profile is used #var.credential_profile_name: fb-aws + # The duration that the received messages are hidden from ReceiveMessage request + # Default to be 300s + #var.visibility_timeout: 300s + + # Maximum duration before AWS API request will be interrupted + # Default to be 120s + #var.api_timeout: 120s + elb: enabled: false @@ -108,6 +116,14 @@ filebeat.modules: # If not set the default profile is used #var.credential_profile_name: fb-aws + # The duration that the received messages are hidden from ReceiveMessage request + # Default to be 300s + #var.visibility_timeout: 300s + + # Maximum duration before AWS API request will be interrupted + # Default to be 120s + #var.api_timeout: 120s + #-------------------------------- Azure Module -------------------------------- - module: azure # All logs diff --git a/x-pack/filebeat/input/s3/config.go b/x-pack/filebeat/input/s3/config.go index 7b79022371d4..e8d6a3ef2be4 100644 --- a/x-pack/filebeat/input/s3/config.go +++ b/x-pack/filebeat/input/s3/config.go @@ -17,6 +17,7 @@ type config struct { QueueURL string `config:"queue_url" validate:"nonzero,required"` VisibilityTimeout time.Duration `config:"visibility_timeout"` AwsConfig awscommon.ConfigAWS `config:",inline"` + APITimeout time.Duration `config:"api_timeout"` } func defaultConfig() config { @@ -25,6 +26,7 @@ func defaultConfig() config { Type: "s3", }, VisibilityTimeout: 300 * time.Second, + APITimeout: 120 * time.Second, } } @@ -33,5 +35,9 @@ func (c *config) Validate() error { return fmt.Errorf("visibility timeout %v is not within the "+ "required range 0s to 12h", c.VisibilityTimeout) } + if c.APITimeout < 0 || c.APITimeout > c.VisibilityTimeout/2 { + return fmt.Errorf("api timeout %v needs to be larger than"+ + " 0s and smaller than half of the visibility timeout", c.APITimeout) + } return nil } diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index 2d83f9e4a182..0133f085735e 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -57,7 +57,7 @@ func init() { } } -// Input is a input for s3 +// s3Input is a input for s3 type s3Input struct { outlet channel.Outleter // Output of received s3 logs. config config @@ -171,6 +171,9 @@ func NewInput(cfg *common.Config, connector channel.Connector, context input.Con func (p *s3Input) Run() { p.workerOnce.Do(func() { visibilityTimeout := int64(p.config.VisibilityTimeout.Seconds()) + p.logger.Infof("visibility timeout is set to %v seconds", visibilityTimeout) + p.logger.Infof("aws api timeout is set to %v", p.config.APITimeout) + regionName, err := getRegionFromQueueURL(p.config.QueueURL) if err != nil { p.logger.Errorf("failed to get region name from queueURL: %v", p.config.QueueURL) @@ -198,7 +201,7 @@ func (p *s3Input) run(svcSQS sqsiface.ClientAPI, svcS3 s3iface.ClientAPI, visibi if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == awssdk.ErrCodeRequestCanceled { continue } - p.logger.Error("failed to receive message from SQS:", err) + p.logger.Error("failed to receive message from SQS: ", err) time.Sleep(time.Duration(waitTimeSecond) * time.Second) continue } @@ -218,7 +221,6 @@ func (p *s3Input) Stop() { p.stopOnce.Do(func() { defer p.outlet.Close() close(p.close) - p.context.Done() p.logger.Info("Stopping s3 input") }) } @@ -232,6 +234,7 @@ func (p *s3Input) Wait() { func (p *s3Input) processor(queueURL string, messages []sqs.Message, visibilityTimeout int64, svcS3 s3iface.ClientAPI, svcSQS sqsiface.ClientAPI) { var wg sync.WaitGroup numMessages := len(messages) + p.logger.Debugf("Processing %v messages", numMessages) wg.Add(numMessages * 2) // process messages received from sqs @@ -251,14 +254,16 @@ func (p *s3Input) processMessage(svcS3 s3iface.ClientAPI, message sqs.Message, w p.logger.Error(errors.Wrap(err, "handleSQSMessage failed")) return } + p.logger.Debugf("handleSQSMessage succeed and returned %v sets of S3 log info", len(s3Infos)) // read from s3 object and create event for each log line err = p.handleS3Objects(svcS3, s3Infos, errC) if err != nil { err = errors.Wrap(err, "handleS3Objects failed") p.logger.Error(err) - errC <- err + return } + p.logger.Debugf("handleS3Objects succeed") } func (p *s3Input) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs.Message, queueURL string, visibilityTimeout int64, wg *sync.WaitGroup, errC chan error) { @@ -269,12 +274,12 @@ func (p *s3Input) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs.Mess return case err := <-errC: if err != nil { - p.logger.Warnf("Processing message failed: %v", err) + p.logger.Warn("Processing message failed, updating visibility timeout") err := p.changeVisibilityTimeout(queueURL, visibilityTimeout, svcSQS, message.ReceiptHandle) if err != nil { p.logger.Error(errors.Wrap(err, "change message visibility failed")) } - p.logger.Warnf("Message visibility timeout updated to %v", visibilityTimeout) + p.logger.Infof("Message visibility timeout updated to %v", visibilityTimeout) } else { // When ACK done, message will be deleted. Or when message is // not s3 ObjectCreated event related(handleSQSMessage function @@ -288,13 +293,14 @@ func (p *s3Input) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs.Mess } return case <-time.After(time.Duration(visibilityTimeout/2) * time.Second): + p.logger.Warn("Half of the set visibilityTimeout passed, visibility timeout needs to be updated") // If half of the set visibilityTimeout passed and this is // still ongoing, then change visibility timeout. err := p.changeVisibilityTimeout(queueURL, visibilityTimeout, svcSQS, message.ReceiptHandle) if err != nil { p.logger.Error(errors.Wrap(err, "change message visibility failed")) } - p.logger.Infof("Message visibility timeout updated to %v", visibilityTimeout) + p.logger.Infof("Message visibility timeout updated to %v seconds", visibilityTimeout) } } } @@ -310,7 +316,11 @@ func (p *s3Input) receiveMessage(svcSQS sqsiface.ClientAPI, visibilityTimeout in WaitTimeSeconds: &waitTimeSecond, }) - return req.Send(p.context) + // The Context will interrupt the request if the timeout expires. + ctx, cancelFn := context.WithTimeout(p.context, p.config.APITimeout) + defer cancelFn() + + return req.Send(ctx) } func (p *s3Input) changeVisibilityTimeout(queueURL string, visibilityTimeout int64, svcSQS sqsiface.ClientAPI, receiptHandle *string) error { @@ -319,7 +329,12 @@ func (p *s3Input) changeVisibilityTimeout(queueURL string, visibilityTimeout int VisibilityTimeout: &visibilityTimeout, ReceiptHandle: receiptHandle, }) - _, err := req.Send(p.context) + + // The Context will interrupt the request if the timeout expires. + ctx, cancelFn := context.WithTimeout(p.context, p.config.APITimeout) + defer cancelFn() + + _, err := req.Send(ctx) return err } @@ -358,111 +373,107 @@ func handleSQSMessage(m sqs.Message) ([]s3Info, error) { } func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC chan error) error { - s3Context := &s3Context{ + s3Ctx := &s3Context{ refs: 1, errC: errC, } - defer s3Context.done() - - for _, s3Info := range s3Infos { - objectHash := s3ObjectHash(s3Info) + defer s3Ctx.done() - // read from s3 object - reader, err := p.newS3BucketReader(svc, s3Info) + for _, info := range s3Infos { + err := p.createEventsFromS3Info(svc, info, s3Ctx) if err != nil { - return errors.Wrap(err, "newS3BucketReader failed") - } - if reader == nil { - continue - } - - offset := 0 - for { - log, err := reader.ReadString('\n') - if log == "" { - break - } - - if err != nil { - if err == io.EOF { - // create event for last line - offset += len([]byte(log)) - event := createEvent(log, offset, s3Info, objectHash, s3Context) - err = p.forwardEvent(event) - if err != nil { - err = errors.Wrapf(err, "forwardEvent failed for %v", s3Info.key) - s3Context.Fail(err) - return err - } - return nil - } - return errors.Wrapf(err, "ReadString failed for %v", s3Info.key) - } - - // create event per log line - offset += len([]byte(log)) - event := createEvent(log, offset, s3Info, objectHash, s3Context) - err = p.forwardEvent(event) - if err != nil { - err = errors.Wrapf(err, "forwardEvent failed for %v", s3Info.key) - s3Context.Fail(err) - return err - } + err = errors.Wrapf(err, "createEventsFromS3Info failed for %v", info.key) + p.logger.Error(err) + s3Ctx.setError(err) } } - return nil } -func (p *s3Input) newS3BucketReader(svc s3iface.ClientAPI, s3Info s3Info) (*bufio.Reader, error) { +func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3Ctx *s3Context) error { + objectHash := s3ObjectHash(info) + + // Download the S3 object using GetObjectRequest. s3GetObjectInput := &s3.GetObjectInput{ - Bucket: awssdk.String(s3Info.name), - Key: awssdk.String(s3Info.key), + Bucket: awssdk.String(info.name), + Key: awssdk.String(info.key), } req := svc.GetObjectRequest(s3GetObjectInput) - resp, err := req.Send(p.context) + // The Context will interrupt the request if the timeout expires. + ctx, cancelFn := context.WithTimeout(p.context, p.config.APITimeout) + defer cancelFn() + + resp, err := req.Send(ctx) if err != nil { if awsErr, ok := err.(awserr.Error); ok { + // If the SDK can determine the request or retry delay was canceled + // by a context the ErrCodeRequestCanceled error will be returned. if awsErr.Code() == awssdk.ErrCodeRequestCanceled { - return nil, nil + err = errors.Wrap(err, "GetObject request canceled") + p.logger.Error(err) + return err } if awsErr.Code() == "NoSuchKey" { - p.logger.Warn("Cannot find s3 file with key ", s3Info.key) - return nil, nil + p.logger.Warn("Cannot find s3 file") + return nil } } - return nil, errors.Wrapf(err, "s3 get object request failed %v", s3Info.key) + return errors.Wrap(err, "s3 get object request failed") } - if resp.Body == nil { - return nil, errors.New("s3 get object response body is empty") - } + defer resp.Body.Close() + reader := bufio.NewReader(resp.Body) // Check content-type - if resp.ContentType != nil { - switch *resp.ContentType { - case "application/x-gzip": - reader, err := gzip.NewReader(resp.Body) + if (resp.ContentType != nil && *resp.ContentType == "application/x-gzip") || strings.HasSuffix(info.key, ".gz") { + gzipReader, err := gzip.NewReader(resp.Body) + if err != nil { + err = errors.Wrap(err, "gzip.NewReader failed") + p.logger.Error(err) + return err + } + reader = bufio.NewReader(gzipReader) + gzipReader.Close() + } + + // handle s3 objects that are not json content-type + offset := 0 + for { + log, err := reader.ReadString('\n') + if log == "" { + break + } + + if err == io.EOF { + // create event for last line + offset += len([]byte(log)) + event := createEvent(log, offset, info, objectHash, s3Ctx) + err = p.forwardEvent(event) if err != nil { - return nil, errors.Wrapf(err, "Failed to decompress gzipped file %v", s3Info.key) + err = errors.Wrap(err, "forwardEvent failed") + p.logger.Error(err) + return err } - return bufio.NewReader(reader), nil - default: - return bufio.NewReader(resp.Body), nil + return nil + } else if err != nil { + err = errors.Wrap(err, "ReadString failed") + p.logger.Error(err) + return err } - } - // If there is no content-type, check file name instead. - if strings.HasSuffix(s3Info.key, ".gz") { - gzipReader, err := gzip.NewReader(resp.Body) + // create event per log line + offset += len([]byte(log)) + event := createEvent(log, offset, info, objectHash, s3Ctx) + err = p.forwardEvent(event) if err != nil { - return nil, errors.Wrapf(err, "Failed to decompress gzipped file %v", s3Info.key) + err = errors.Wrap(err, "forwardEvent failed") + p.logger.Error(err) + return err } - return bufio.NewReader(gzipReader), nil } - return bufio.NewReader(resp.Body), nil + return nil } func (p *s3Input) forwardEvent(event beat.Event) error { @@ -480,7 +491,12 @@ func (p *s3Input) deleteMessage(queueURL string, messagesReceiptHandle string, s } req := svcSQS.DeleteMessageRequest(deleteMessageInput) - _, err := req.Send(p.context) + + // The Context will interrupt the request if the timeout expires. + ctx, cancelFn := context.WithTimeout(p.context, p.config.APITimeout) + defer cancelFn() + + _, err := req.Send(ctx) if err != nil { if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == awssdk.ErrCodeRequestCanceled { return nil @@ -490,33 +506,33 @@ func (p *s3Input) deleteMessage(queueURL string, messagesReceiptHandle string, s return nil } -func createEvent(log string, offset int, s3Info s3Info, objectHash string, s3Context *s3Context) beat.Event { +func createEvent(log string, offset int, info s3Info, objectHash string, s3Ctx *s3Context) beat.Event { f := common.MapStr{ "message": log, "log": common.MapStr{ "offset": int64(offset), - "file.path": constructObjectURL(s3Info), + "file.path": constructObjectURL(info), }, "aws": common.MapStr{ "s3": common.MapStr{ "bucket": common.MapStr{ - "name": s3Info.name, - "arn": s3Info.arn}, - "object.key": s3Info.key, + "name": info.name, + "arn": info.arn}, + "object.key": info.key, }, }, "cloud": common.MapStr{ "provider": "aws", - "region": s3Info.region, + "region": info.region, }, } - s3Context.Inc() + s3Ctx.Inc() return beat.Event{ Timestamp: time.Now(), Fields: f, Meta: common.MapStr{"id": objectHash + "-" + fmt.Sprintf("%012d", offset)}, - Private: s3Context, + Private: s3Ctx, } } @@ -532,11 +548,6 @@ func s3ObjectHash(s3Info s3Info) string { return prefix[:10] } -func (c *s3Context) Fail(err error) { - c.setError(err) - c.done() -} - func (c *s3Context) setError(err error) { // only care about the last error for now // TODO: add "Typed" error to error for context diff --git a/x-pack/filebeat/input/s3/input_test.go b/x-pack/filebeat/input/s3/input_test.go index 9290b3f664d3..62d93a66e5fe 100644 --- a/x-pack/filebeat/input/s3/input_test.go +++ b/x-pack/filebeat/input/s3/input_test.go @@ -5,7 +5,9 @@ package s3 import ( + "bufio" "bytes" + "context" "fmt" "io" "io/ioutil" @@ -26,16 +28,10 @@ type MockS3Client struct { s3iface.ClientAPI } -// MockS3ClientErr struct is used for unit tests. -type MockS3ClientErr struct { - s3iface.ClientAPI -} - var ( s3LogString1 = "36c1f test-s3-ks [20/Jun/2019] 1.2.3.4 arn:aws:iam::1234:user/test@elastic.co 5141F REST.HEAD.OBJECT Screen1.png \n" s3LogString2 = "28kdg test-s3-ks [20/Jun/2019] 1.2.3.4 arn:aws:iam::1234:user/test@elastic.co 5A070 REST.HEAD.OBJECT Screen2.png \n" mockSvc = &MockS3Client{} - mockSvcErr = &MockS3ClientErr{} info = s3Info{ name: "test-s3-ks", key: "log2019-06-21-16-16-54", @@ -56,16 +52,6 @@ func (m *MockS3Client) GetObjectRequest(input *s3.GetObjectInput) s3.GetObjectRe } } -func (m *MockS3ClientErr) GetObjectRequest(input *s3.GetObjectInput) s3.GetObjectRequest { - httpReq, _ := http.NewRequest("", "", nil) - return s3.GetObjectRequest{ - Request: &awssdk.Request{ - Data: &s3.GetObjectOutput{}, - HTTPRequest: httpReq, - }, - } -} - func TestGetRegionFromQueueURL(t *testing.T) { queueURL := "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs" regionName, err := getRegionFromQueueURL(queueURL) @@ -150,8 +136,22 @@ func TestHandleMessage(t *testing.T) { func TestNewS3BucketReader(t *testing.T) { p := &s3Input{context: &channelContext{}} - reader, err := p.newS3BucketReader(mockSvc, info) + s3GetObjectInput := &s3.GetObjectInput{ + Bucket: awssdk.String(info.name), + Key: awssdk.String(info.key), + } + req := mockSvc.GetObjectRequest(s3GetObjectInput) + + // The Context will interrupt the request if the timeout expires. + var cancelFn func() + ctx, cancelFn := context.WithTimeout(p.context, p.config.APITimeout) + defer cancelFn() + + resp, err := req.Send(ctx) assert.NoError(t, err) + reader := bufio.NewReader(resp.Body) + defer resp.Body.Close() + for i := 0; i < 3; i++ { switch i { case 0: @@ -170,13 +170,6 @@ func TestNewS3BucketReader(t *testing.T) { } } -func TestNewS3BucketReaderErr(t *testing.T) { - p := &s3Input{context: &channelContext{}} - reader, err := p.newS3BucketReader(mockSvcErr, info) - assert.Error(t, err, "s3 get object response body is empty") - assert.Nil(t, reader) -} - func TestCreateEvent(t *testing.T) { p := &s3Input{context: &channelContext{}} errC := make(chan error) @@ -194,8 +187,22 @@ func TestCreateEvent(t *testing.T) { } s3ObjectHash := s3ObjectHash(s3Info) - reader, err := p.newS3BucketReader(mockSvc, s3Info) + s3GetObjectInput := &s3.GetObjectInput{ + Bucket: awssdk.String(info.name), + Key: awssdk.String(info.key), + } + req := mockSvc.GetObjectRequest(s3GetObjectInput) + + // The Context will interrupt the request if the timeout expires. + var cancelFn func() + ctx, cancelFn := context.WithTimeout(p.context, p.config.APITimeout) + defer cancelFn() + + resp, err := req.Send(ctx) assert.NoError(t, err) + reader := bufio.NewReader(resp.Body) + defer resp.Body.Close() + var events []beat.Event for { log, err := reader.ReadString('\n') diff --git a/x-pack/filebeat/module/aws/_meta/config.yml b/x-pack/filebeat/module/aws/_meta/config.yml index 3823c9746109..2cdfad24299e 100644 --- a/x-pack/filebeat/module/aws/_meta/config.yml +++ b/x-pack/filebeat/module/aws/_meta/config.yml @@ -14,6 +14,14 @@ # If not set the default profile is used #var.credential_profile_name: fb-aws + # The duration that the received messages are hidden from ReceiveMessage request + # Default to be 300s + #var.visibility_timeout: 300s + + # Maximum duration before AWS API request will be interrupted + # Default to be 120s + #var.api_timeout: 120s + elb: enabled: false @@ -28,3 +36,11 @@ # Profile name for aws credential # If not set the default profile is used #var.credential_profile_name: fb-aws + + # The duration that the received messages are hidden from ReceiveMessage request + # Default to be 300s + #var.visibility_timeout: 300s + + # Maximum duration before AWS API request will be interrupted + # Default to be 120s + #var.api_timeout: 120s diff --git a/x-pack/filebeat/module/aws/elb/config/s3.yml b/x-pack/filebeat/module/aws/elb/config/s3.yml index c4c151708b92..40212723e968 100644 --- a/x-pack/filebeat/module/aws/elb/config/s3.yml +++ b/x-pack/filebeat/module/aws/elb/config/s3.yml @@ -8,3 +8,11 @@ credential_profile_name: {{ .credential_profile_name }} {{ if .shared_credential_file }} shared_credential_file: {{ .shared_credential_file }} {{ end }} + +{{ if .visibility_timeout }} +visibility_timeout: {{ .visibility_timeout }} +{{ end }} + +{{ if .api_timeout }} +api_timeout: {{ .api_timeout }} +{{ end }} diff --git a/x-pack/filebeat/module/aws/elb/manifest.yml b/x-pack/filebeat/module/aws/elb/manifest.yml index 9bf88b4703e2..03b362a28711 100644 --- a/x-pack/filebeat/module/aws/elb/manifest.yml +++ b/x-pack/filebeat/module/aws/elb/manifest.yml @@ -7,6 +7,10 @@ var: default: ~/.aws/credentials - name: credential_profile_name default: test + - name: visibility_timeout + default: 300s + - name: api_timeout + default: 120s ingest_pipeline: ingest/pipeline.yml input: config/{{.input}}.yml diff --git a/x-pack/filebeat/module/aws/s3access/config/s3.yml b/x-pack/filebeat/module/aws/s3access/config/s3.yml index c4c151708b92..40212723e968 100644 --- a/x-pack/filebeat/module/aws/s3access/config/s3.yml +++ b/x-pack/filebeat/module/aws/s3access/config/s3.yml @@ -8,3 +8,11 @@ credential_profile_name: {{ .credential_profile_name }} {{ if .shared_credential_file }} shared_credential_file: {{ .shared_credential_file }} {{ end }} + +{{ if .visibility_timeout }} +visibility_timeout: {{ .visibility_timeout }} +{{ end }} + +{{ if .api_timeout }} +api_timeout: {{ .api_timeout }} +{{ end }} diff --git a/x-pack/filebeat/module/aws/s3access/manifest.yml b/x-pack/filebeat/module/aws/s3access/manifest.yml index 9d9bec1c3e9d..23246715b5cc 100644 --- a/x-pack/filebeat/module/aws/s3access/manifest.yml +++ b/x-pack/filebeat/module/aws/s3access/manifest.yml @@ -7,5 +7,10 @@ var: default: ~/.aws/credentials - name: credential_profile_name default: test + - name: visibility_timeout + default: 300s + - name: api_timeout + default: 120s + ingest_pipeline: ingest/pipeline.yml input: config/{{.input}}.yml diff --git a/x-pack/filebeat/modules.d/aws.yml.disabled b/x-pack/filebeat/modules.d/aws.yml.disabled index d628d505dd9f..e7901a14f3b4 100644 --- a/x-pack/filebeat/modules.d/aws.yml.disabled +++ b/x-pack/filebeat/modules.d/aws.yml.disabled @@ -17,6 +17,14 @@ # If not set the default profile is used #var.credential_profile_name: fb-aws + # The duration that the received messages are hidden from ReceiveMessage request + # Default to be 300s + #var.visibility_timeout: 300s + + # Maximum duration before AWS API request will be interrupted + # Default to be 120s + #var.api_timeout: 120s + elb: enabled: false @@ -31,3 +39,11 @@ # Profile name for aws credential # If not set the default profile is used #var.credential_profile_name: fb-aws + + # The duration that the received messages are hidden from ReceiveMessage request + # Default to be 300s + #var.visibility_timeout: 300s + + # Maximum duration before AWS API request will be interrupted + # Default to be 120s + #var.api_timeout: 120s