From f3aab646ad31c35b396250ad969c684527ab81b0 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Tue, 18 Feb 2020 07:09:18 -0700 Subject: [PATCH] [Filebeat] Add timeout to GetObjectRequest for s3 input (#15590) (#15901) * Add timeout to GetObjectRequest which will cancel the request if it takes too long * Close resp.Body from S3 GetObject API to prevent resource leak * Change aws_api_timeout to api_timeout (cherry picked from commit 86c3e6372ddaf73a4a602f774f3db7b91ffcb8f3) --- CHANGELOG.next.asciidoc | 1 + .../docs/inputs/input-aws-s3.asciidoc | 12 +- x-pack/filebeat/filebeat.reference.yml | 32 +++ x-pack/filebeat/input/s3/config.go | 6 + x-pack/filebeat/input/s3/input.go | 253 +++++++++--------- x-pack/filebeat/input/s3/input_test.go | 57 ++-- x-pack/filebeat/module/aws/_meta/config.yml | 32 +++ .../aws/cloudtrail/config/cloudtrail.yml | 7 + .../module/aws/cloudtrail/manifest.yml | 2 + x-pack/filebeat/module/aws/elb/config/s3.yml | 8 + x-pack/filebeat/module/aws/elb/manifest.yml | 2 + .../module/aws/s3access/config/s3.yml | 8 + .../filebeat/module/aws/s3access/manifest.yml | 2 + .../module/aws/vpcflow/config/input.yml | 8 + .../filebeat/module/aws/vpcflow/manifest.yml | 2 + x-pack/filebeat/modules.d/aws.yml.disabled | 32 +++ 16 files changed, 314 insertions(+), 150 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index f9523725fd7c..262592c8efe9 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -93,6 +93,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - netflow: Fix bytes/packets counters on some devices (NSEL and Netstream). {pull}15449[15449] - netflow: Fix compatibility with some Cisco devices by changing the field `class_id` from short to long. {pull}15449[15449] - Fixed dashboard for Cisco ASA Firewall. {issue}15420[15420] {pull}15553[15553] +- Fix s3 input hanging with GetObjectRequest API call by adding context_timeout config. {issue}15502[15502] {pull}15590[15590] - Add shared_credential_file to cloudtrail config {issue}15652[15652] {pull}15656[15656] - Fix typos in zeek notice fileset config file. {issue}15764[15764] {pull}15765[15765] - Fix mapping error when zeek weird logs do not contain IP addresses. {pull}15906[15906] diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 4535a8946b07..6715b8547476 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -45,9 +45,9 @@ URL of the AWS SQS queue that messages will be received from. Required. [float] ==== `visibility_timeout` -The duration (in seconds) that the received messages are hidden from subsequent +The duration that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request. -This value needs to be a lot bigger than filebeat collection frequency so +This value needs to be a lot bigger than {beatname_uc} collection frequency so if it took too long to read the s3 log, this sqs message will not be reprocessed. The default visibility timeout for a message is 300 seconds. The minimum is 0 seconds. The maximum is 12 hours. @@ -61,6 +61,14 @@ can be assigned the name of the field. This setting will be able to split the messages under the group value into separate events. For example, CloudTrail logs are in JSON format and events are found under the JSON object "Records": +[float] +==== `api_timeout` + +The maximum duration of AWS API can take. If it exceeds the timeout, AWS API +will be interrupted. +The default AWS API timeout for a message is 120 seconds. The minimum +is 0 seconds. The maximum is half of the visibility timeout value. + ["source","json"] ---- { diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index ecac89365e07..0f99430a5ac9 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -111,6 +111,14 @@ filebeat.modules: # If not set the default profile is used #var.credential_profile_name: fb-aws + # The duration that the received messages are hidden from ReceiveMessage request + # Default to be 300s + #var.visibility_timeout: 300s + + # Maximum duration before AWS API request will be interrupted + # Default to be 120s + #var.api_timeout: 120s + elb: enabled: false @@ -126,6 +134,14 @@ filebeat.modules: # If not set the default profile is used #var.credential_profile_name: fb-aws + # The duration that the received messages are hidden from ReceiveMessage request + # Default to be 300s + #var.visibility_timeout: 300s + + # Maximum duration before AWS API request will be interrupted + # Default to be 120s + #var.api_timeout: 120s + vpcflow: enabled: false @@ -141,6 +157,14 @@ filebeat.modules: # If not set the default profile is used #var.credential_profile_name: fb-aws + # The duration that the received messages are hidden from ReceiveMessage request + # Default to be 300s + #var.visibility_timeout: 300s + + # Maximum duration before AWS API request will be interrupted + # Default to be 120s + #var.api_timeout: 120s + cloudtrail: enabled: false @@ -156,6 +180,14 @@ filebeat.modules: # If not set the default profile is used #var.credential_profile_name: fb-aws + # The duration that the received messages are hidden from ReceiveMessage request + # Default to be 300s + #var.visibility_timeout: 300s + + # Maximum duration before AWS API request will be interrupted + # Default to be 120s + #var.api_timeout: 120s + #-------------------------------- Azure Module -------------------------------- - module: azure # All logs diff --git a/x-pack/filebeat/input/s3/config.go b/x-pack/filebeat/input/s3/config.go index 758209c44bf0..7fa455713e96 100644 --- a/x-pack/filebeat/input/s3/config.go +++ b/x-pack/filebeat/input/s3/config.go @@ -18,6 +18,7 @@ type config struct { VisibilityTimeout time.Duration `config:"visibility_timeout"` AwsConfig awscommon.ConfigAWS `config:",inline"` ExpandEventListFromField string `config:"expand_event_list_from_field"` + APITimeout time.Duration `config:"api_timeout"` } func defaultConfig() config { @@ -26,6 +27,7 @@ func defaultConfig() config { Type: "s3", }, VisibilityTimeout: 300 * time.Second, + APITimeout: 120 * time.Second, } } @@ -34,5 +36,9 @@ func (c *config) Validate() error { return fmt.Errorf("visibility timeout %v is not within the "+ "required range 0s to 12h", c.VisibilityTimeout) } + if c.APITimeout < 0 || c.APITimeout > c.VisibilityTimeout/2 { + return fmt.Errorf("api timeout %v needs to be larger than"+ + " 0s and smaller than half of the visibility timeout", c.APITimeout) + } return nil } diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index eef16d918fae..dd254cffebf2 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -171,6 +171,9 @@ func NewInput(cfg *common.Config, connector channel.Connector, context input.Con func (p *s3Input) Run() { p.workerOnce.Do(func() { visibilityTimeout := int64(p.config.VisibilityTimeout.Seconds()) + p.logger.Infof("visibility timeout is set to %v seconds", visibilityTimeout) + p.logger.Infof("aws api timeout is set to %v", p.config.APITimeout) + regionName, err := getRegionFromQueueURL(p.config.QueueURL) if err != nil { p.logger.Errorf("failed to get region name from queueURL: %v", p.config.QueueURL) @@ -198,7 +201,7 @@ func (p *s3Input) run(svcSQS sqsiface.ClientAPI, svcS3 s3iface.ClientAPI, visibi if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == awssdk.ErrCodeRequestCanceled { continue } - p.logger.Error("failed to receive message from SQS:", err) + p.logger.Error("failed to receive message from SQS: ", err) time.Sleep(time.Duration(waitTimeSecond) * time.Second) continue } @@ -218,7 +221,6 @@ func (p *s3Input) Stop() { p.stopOnce.Do(func() { defer p.outlet.Close() close(p.close) - p.context.Done() p.logger.Info("Stopping s3 input") }) } @@ -272,12 +274,12 @@ func (p *s3Input) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs.Mess return case err := <-errC: if err != nil { - p.logger.Warnf("Processing message failed: %v", err) + p.logger.Warn("Processing message failed, updating visibility timeout") err := p.changeVisibilityTimeout(queueURL, visibilityTimeout, svcSQS, message.ReceiptHandle) if err != nil { p.logger.Error(errors.Wrap(err, "change message visibility failed")) } - p.logger.Warnf("Message visibility timeout updated to %v", visibilityTimeout) + p.logger.Infof("Message visibility timeout updated to %v", visibilityTimeout) } else { // When ACK done, message will be deleted. Or when message is // not s3 ObjectCreated event related(handleSQSMessage function @@ -314,7 +316,11 @@ func (p *s3Input) receiveMessage(svcSQS sqsiface.ClientAPI, visibilityTimeout in WaitTimeSeconds: &waitTimeSecond, }) - return req.Send(p.context) + // The Context will interrupt the request if the timeout expires. + ctx, cancelFn := context.WithTimeout(p.context, p.config.APITimeout) + defer cancelFn() + + return req.Send(ctx) } func (p *s3Input) changeVisibilityTimeout(queueURL string, visibilityTimeout int64, svcSQS sqsiface.ClientAPI, receiptHandle *string) error { @@ -323,7 +329,12 @@ func (p *s3Input) changeVisibilityTimeout(queueURL string, visibilityTimeout int VisibilityTimeout: &visibilityTimeout, ReceiptHandle: receiptHandle, }) - _, err := req.Send(p.context) + + // The Context will interrupt the request if the timeout expires. + ctx, cancelFn := context.WithTimeout(p.context, p.config.APITimeout) + defer cancelFn() + + _, err := req.Send(ctx) return err } @@ -362,80 +373,122 @@ func handleSQSMessage(m sqs.Message) ([]s3Info, error) { } func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC chan error) error { - s3Context := &s3Context{ + s3Ctx := &s3Context{ refs: 1, errC: errC, } - defer s3Context.done() + defer s3Ctx.done() - for _, s3Info := range s3Infos { - objectHash := s3ObjectHash(s3Info) - - // read from s3 object - reader, err := p.newS3BucketReader(svc, s3Info) + for _, info := range s3Infos { + err := p.createEventsFromS3Info(svc, info, s3Ctx) if err != nil { - err = errors.Wrap(err, "newS3BucketReader failed") - s3Context.setError(err) - return err + err = errors.Wrapf(err, "createEventsFromS3Info failed for %v", info.key) + p.logger.Error(err) + s3Ctx.setError(err) } + } + return nil +} - if reader == nil { - continue - } +func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3Ctx *s3Context) error { + objectHash := s3ObjectHash(info) - // Decode JSON documents when expand_event_list_from_field is given in config - if p.config.ExpandEventListFromField != "" { - decoder := json.NewDecoder(reader) - err := p.decodeJSONWithKey(decoder, objectHash, s3Info, s3Context) - if err != nil { - err = errors.Wrapf(err, "decodeJSONWithKey failed for %v", s3Info.key) - s3Context.setError(err) - return err - } - return nil - } + // Download the S3 object using GetObjectRequest. + s3GetObjectInput := &s3.GetObjectInput{ + Bucket: awssdk.String(info.name), + Key: awssdk.String(info.key), + } + req := svc.GetObjectRequest(s3GetObjectInput) + + // The Context will interrupt the request if the timeout expires. + ctx, cancelFn := context.WithTimeout(p.context, p.config.APITimeout) + defer cancelFn() - // handle s3 objects that are not json content-type - offset := 0 - for { - log, err := reader.ReadString('\n') - if log == "" { - break + resp, err := req.Send(ctx) + if err != nil { + if awsErr, ok := err.(awserr.Error); ok { + // If the SDK can determine the request or retry delay was canceled + // by a context the ErrCodeRequestCanceled error will be returned. + if awsErr.Code() == awssdk.ErrCodeRequestCanceled { + err = errors.Wrap(err, "GetObject request canceled") + p.logger.Error(err) + return err } - if err == io.EOF { - // create event for last line - offset += len([]byte(log)) - event := createEvent(log, offset, s3Info, objectHash, s3Context) - err = p.forwardEvent(event) - if err != nil { - err = errors.Wrapf(err, "forwardEvent failed for %v", s3Info.key) - s3Context.setError(err) - return err - } + if awsErr.Code() == "NoSuchKey" { + p.logger.Warn("Cannot find s3 file") return nil - } else if err != nil { - err = errors.Wrapf(err, "ReadString failed for %v", s3Info.key) - s3Context.setError(err) - return err } + } + return errors.Wrap(err, "s3 get object request failed") + } + + defer resp.Body.Close() + + reader := bufio.NewReader(resp.Body) + // Check content-type + if (resp.ContentType != nil && *resp.ContentType == "application/x-gzip") || strings.HasSuffix(info.key, ".gz") { + gzipReader, err := gzip.NewReader(resp.Body) + if err != nil { + err = errors.Wrap(err, "gzip.NewReader failed") + p.logger.Error(err) + return err + } + reader = bufio.NewReader(gzipReader) + gzipReader.Close() + } + + // Decode JSON documents when expand_event_list_from_field is given in config + if p.config.ExpandEventListFromField != "" { + decoder := json.NewDecoder(reader) + err := p.decodeJSONWithKey(decoder, objectHash, info, s3Ctx) + if err != nil { + err = errors.Wrap(err, "decodeJSONWithKey failed") + p.logger.Error(err) + return err + } + return nil + } - // create event per log line + // handle s3 objects that are not json content-type + offset := 0 + for { + log, err := reader.ReadString('\n') + if log == "" { + break + } + + if err == io.EOF { + // create event for last line offset += len([]byte(log)) - event := createEvent(log, offset, s3Info, objectHash, s3Context) + event := createEvent(log, offset, info, objectHash, s3Ctx) err = p.forwardEvent(event) if err != nil { - err = errors.Wrapf(err, "forwardEvent failed for %v", s3Info.key) - s3Context.setError(err) + err = errors.Wrap(err, "forwardEvent failed") + p.logger.Error(err) return err } + return nil + } else if err != nil { + err = errors.Wrap(err, "ReadString failed") + p.logger.Error(err) + return err } - } + // create event per log line + offset += len([]byte(log)) + event := createEvent(log, offset, info, objectHash, s3Ctx) + err = p.forwardEvent(event) + if err != nil { + err = errors.Wrap(err, "forwardEvent failed") + p.logger.Error(err) + return err + } + } return nil } -func (p *s3Input) decodeJSONWithKey(decoder *json.Decoder, objectHash string, s3Info s3Info, s3Context *s3Context) error { +func (p *s3Input) decodeJSONWithKey(decoder *json.Decoder, objectHash string, s3Info s3Info, s3Ctx *s3Context) error { offset := 0 for { var jsonFields map[string][]interface{} @@ -449,15 +502,15 @@ func (p *s3Input) decodeJSONWithKey(decoder *json.Decoder, objectHash string, s3 // get logs from expand_event_list_from_field textValues, ok := jsonFields[p.config.ExpandEventListFromField] if !ok { - err = errors.Wrapf(err, fmt.Sprintf("Key '%s' not found", p.config.ExpandEventListFromField)) + err = errors.Wrapf(err, fmt.Sprintf("key '%s' not found", p.config.ExpandEventListFromField)) p.logger.Error(err) return err } for _, v := range textValues { - err := p.convertJSONToEvent(v, offset, objectHash, s3Info, s3Context) + err := p.convertJSONToEvent(v, offset, objectHash, s3Info, s3Ctx) if err != nil { - err = errors.Wrapf(err, fmt.Sprintf("convertJSONToEvent failed for %v", s3Info.key)) + err = errors.Wrap(err, "convertJSONToEvent failed") p.logger.Error(err) return err } @@ -476,7 +529,7 @@ func (p *s3Input) decodeJSONWithKey(decoder *json.Decoder, objectHash string, s3 } for _, v := range textValues { - err := p.convertJSONToEvent(v, offset, objectHash, s3Info, s3Context) + err := p.convertJSONToEvent(v, offset, objectHash, s3Info, s3Ctx) if err != nil { err = errors.Wrapf(err, fmt.Sprintf("Key '%s' not found", p.config.ExpandEventListFromField)) p.logger.Error(err) @@ -486,72 +539,21 @@ func (p *s3Input) decodeJSONWithKey(decoder *json.Decoder, objectHash string, s3 } } -func (p *s3Input) convertJSONToEvent(jsonFields interface{}, offset int, objectHash string, s3Info s3Info, s3Context *s3Context) error { +func (p *s3Input) convertJSONToEvent(jsonFields interface{}, offset int, objectHash string, s3Info s3Info, s3Ctx *s3Context) error { vJSON, err := json.Marshal(jsonFields) log := string(vJSON) offset += len([]byte(log)) - event := createEvent(log, offset, s3Info, objectHash, s3Context) + event := createEvent(log, offset, s3Info, objectHash, s3Ctx) err = p.forwardEvent(event) if err != nil { - err = errors.Wrapf(err, fmt.Sprintf("forwardEvent failed for %s", s3Info.key)) + err = errors.Wrap(err, fmt.Sprintf("forwardEvent failed")) p.logger.Error(err) return err } return nil } -func (p *s3Input) newS3BucketReader(svc s3iface.ClientAPI, s3Info s3Info) (*bufio.Reader, error) { - s3GetObjectInput := &s3.GetObjectInput{ - Bucket: awssdk.String(s3Info.name), - Key: awssdk.String(s3Info.key), - } - req := svc.GetObjectRequest(s3GetObjectInput) - - resp, err := req.Send(p.context) - if err != nil { - if awsErr, ok := err.(awserr.Error); ok { - if awsErr.Code() == awssdk.ErrCodeRequestCanceled { - return nil, nil - } - - if awsErr.Code() == "NoSuchKey" { - p.logger.Warn("Cannot find s3 file with key ", s3Info.key) - return nil, nil - } - } - return nil, errors.Wrapf(err, "s3 get object request failed %v", s3Info.key) - } - - if resp.Body == nil { - return nil, errors.New("s3 get object response body is empty") - } - - // Check content-type - if resp.ContentType != nil { - switch *resp.ContentType { - case "application/x-gzip": - reader, err := gzip.NewReader(resp.Body) - if err != nil { - return nil, errors.Wrapf(err, "Failed to decompress gzipped file %v", s3Info.key) - } - return bufio.NewReader(reader), nil - default: - return bufio.NewReader(resp.Body), nil - } - } - - // If there is no content-type, check file name instead. - if strings.HasSuffix(s3Info.key, ".gz") { - gzipReader, err := gzip.NewReader(resp.Body) - if err != nil { - return nil, errors.Wrapf(err, "Failed to decompress gzipped file %v", s3Info.key) - } - return bufio.NewReader(gzipReader), nil - } - return bufio.NewReader(resp.Body), nil -} - func (p *s3Input) forwardEvent(event beat.Event) error { ok := p.outlet.OnEvent(event) if !ok { @@ -567,7 +569,12 @@ func (p *s3Input) deleteMessage(queueURL string, messagesReceiptHandle string, s } req := svcSQS.DeleteMessageRequest(deleteMessageInput) - _, err := req.Send(p.context) + + // The Context will interrupt the request if the timeout expires. + ctx, cancelFn := context.WithTimeout(p.context, p.config.APITimeout) + defer cancelFn() + + _, err := req.Send(ctx) if err != nil { if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == awssdk.ErrCodeRequestCanceled { return nil @@ -577,33 +584,33 @@ func (p *s3Input) deleteMessage(queueURL string, messagesReceiptHandle string, s return nil } -func createEvent(log string, offset int, s3Info s3Info, objectHash string, s3Context *s3Context) beat.Event { +func createEvent(log string, offset int, info s3Info, objectHash string, s3Ctx *s3Context) beat.Event { f := common.MapStr{ "message": log, "log": common.MapStr{ "offset": int64(offset), - "file.path": constructObjectURL(s3Info), + "file.path": constructObjectURL(info), }, "aws": common.MapStr{ "s3": common.MapStr{ "bucket": common.MapStr{ - "name": s3Info.name, - "arn": s3Info.arn}, - "object.key": s3Info.key, + "name": info.name, + "arn": info.arn}, + "object.key": info.key, }, }, "cloud": common.MapStr{ "provider": "aws", - "region": s3Info.region, + "region": info.region, }, } - s3Context.Inc() + s3Ctx.Inc() return beat.Event{ Timestamp: time.Now(), Fields: f, Meta: common.MapStr{"id": objectHash + "-" + fmt.Sprintf("%012d", offset)}, - Private: s3Context, + Private: s3Ctx, } } diff --git a/x-pack/filebeat/input/s3/input_test.go b/x-pack/filebeat/input/s3/input_test.go index 9290b3f664d3..62d93a66e5fe 100644 --- a/x-pack/filebeat/input/s3/input_test.go +++ b/x-pack/filebeat/input/s3/input_test.go @@ -5,7 +5,9 @@ package s3 import ( + "bufio" "bytes" + "context" "fmt" "io" "io/ioutil" @@ -26,16 +28,10 @@ type MockS3Client struct { s3iface.ClientAPI } -// MockS3ClientErr struct is used for unit tests. -type MockS3ClientErr struct { - s3iface.ClientAPI -} - var ( s3LogString1 = "36c1f test-s3-ks [20/Jun/2019] 1.2.3.4 arn:aws:iam::1234:user/test@elastic.co 5141F REST.HEAD.OBJECT Screen1.png \n" s3LogString2 = "28kdg test-s3-ks [20/Jun/2019] 1.2.3.4 arn:aws:iam::1234:user/test@elastic.co 5A070 REST.HEAD.OBJECT Screen2.png \n" mockSvc = &MockS3Client{} - mockSvcErr = &MockS3ClientErr{} info = s3Info{ name: "test-s3-ks", key: "log2019-06-21-16-16-54", @@ -56,16 +52,6 @@ func (m *MockS3Client) GetObjectRequest(input *s3.GetObjectInput) s3.GetObjectRe } } -func (m *MockS3ClientErr) GetObjectRequest(input *s3.GetObjectInput) s3.GetObjectRequest { - httpReq, _ := http.NewRequest("", "", nil) - return s3.GetObjectRequest{ - Request: &awssdk.Request{ - Data: &s3.GetObjectOutput{}, - HTTPRequest: httpReq, - }, - } -} - func TestGetRegionFromQueueURL(t *testing.T) { queueURL := "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs" regionName, err := getRegionFromQueueURL(queueURL) @@ -150,8 +136,22 @@ func TestHandleMessage(t *testing.T) { func TestNewS3BucketReader(t *testing.T) { p := &s3Input{context: &channelContext{}} - reader, err := p.newS3BucketReader(mockSvc, info) + s3GetObjectInput := &s3.GetObjectInput{ + Bucket: awssdk.String(info.name), + Key: awssdk.String(info.key), + } + req := mockSvc.GetObjectRequest(s3GetObjectInput) + + // The Context will interrupt the request if the timeout expires. + var cancelFn func() + ctx, cancelFn := context.WithTimeout(p.context, p.config.APITimeout) + defer cancelFn() + + resp, err := req.Send(ctx) assert.NoError(t, err) + reader := bufio.NewReader(resp.Body) + defer resp.Body.Close() + for i := 0; i < 3; i++ { switch i { case 0: @@ -170,13 +170,6 @@ func TestNewS3BucketReader(t *testing.T) { } } -func TestNewS3BucketReaderErr(t *testing.T) { - p := &s3Input{context: &channelContext{}} - reader, err := p.newS3BucketReader(mockSvcErr, info) - assert.Error(t, err, "s3 get object response body is empty") - assert.Nil(t, reader) -} - func TestCreateEvent(t *testing.T) { p := &s3Input{context: &channelContext{}} errC := make(chan error) @@ -194,8 +187,22 @@ func TestCreateEvent(t *testing.T) { } s3ObjectHash := s3ObjectHash(s3Info) - reader, err := p.newS3BucketReader(mockSvc, s3Info) + s3GetObjectInput := &s3.GetObjectInput{ + Bucket: awssdk.String(info.name), + Key: awssdk.String(info.key), + } + req := mockSvc.GetObjectRequest(s3GetObjectInput) + + // The Context will interrupt the request if the timeout expires. + var cancelFn func() + ctx, cancelFn := context.WithTimeout(p.context, p.config.APITimeout) + defer cancelFn() + + resp, err := req.Send(ctx) assert.NoError(t, err) + reader := bufio.NewReader(resp.Body) + defer resp.Body.Close() + var events []beat.Event for { log, err := reader.ReadString('\n') diff --git a/x-pack/filebeat/module/aws/_meta/config.yml b/x-pack/filebeat/module/aws/_meta/config.yml index f069a6d3128a..e87956f950ce 100644 --- a/x-pack/filebeat/module/aws/_meta/config.yml +++ b/x-pack/filebeat/module/aws/_meta/config.yml @@ -14,6 +14,14 @@ # If not set the default profile is used #var.credential_profile_name: fb-aws + # The duration that the received messages are hidden from ReceiveMessage request + # Default to be 300s + #var.visibility_timeout: 300s + + # Maximum duration before AWS API request will be interrupted + # Default to be 120s + #var.api_timeout: 120s + elb: enabled: false @@ -29,6 +37,14 @@ # If not set the default profile is used #var.credential_profile_name: fb-aws + # The duration that the received messages are hidden from ReceiveMessage request + # Default to be 300s + #var.visibility_timeout: 300s + + # Maximum duration before AWS API request will be interrupted + # Default to be 120s + #var.api_timeout: 120s + vpcflow: enabled: false @@ -44,6 +60,14 @@ # If not set the default profile is used #var.credential_profile_name: fb-aws + # The duration that the received messages are hidden from ReceiveMessage request + # Default to be 300s + #var.visibility_timeout: 300s + + # Maximum duration before AWS API request will be interrupted + # Default to be 120s + #var.api_timeout: 120s + cloudtrail: enabled: false @@ -58,3 +82,11 @@ # Profile name for aws credential # If not set the default profile is used #var.credential_profile_name: fb-aws + + # The duration that the received messages are hidden from ReceiveMessage request + # Default to be 300s + #var.visibility_timeout: 300s + + # Maximum duration before AWS API request will be interrupted + # Default to be 120s + #var.api_timeout: 120s diff --git a/x-pack/filebeat/module/aws/cloudtrail/config/cloudtrail.yml b/x-pack/filebeat/module/aws/cloudtrail/config/cloudtrail.yml index 6b340543a86a..87219497aef3 100644 --- a/x-pack/filebeat/module/aws/cloudtrail/config/cloudtrail.yml +++ b/x-pack/filebeat/module/aws/cloudtrail/config/cloudtrail.yml @@ -12,6 +12,13 @@ credential_profile_name: {{ .credential_profile_name }} shared_credential_file: {{ .shared_credential_file }} {{ end }} +{{ if .visibility_timeout }} +visibility_timeout: {{ .visibility_timeout }} +{{ end }} + +{{ if .api_timeout }} +api_timeout: {{ .api_timeout }} +{{ end }} {{ else if eq .input "file" }} diff --git a/x-pack/filebeat/module/aws/cloudtrail/manifest.yml b/x-pack/filebeat/module/aws/cloudtrail/manifest.yml index 4865624045e7..d809dec5fb51 100644 --- a/x-pack/filebeat/module/aws/cloudtrail/manifest.yml +++ b/x-pack/filebeat/module/aws/cloudtrail/manifest.yml @@ -5,6 +5,8 @@ var: default: s3 - name: shared_credential_file - name: credential_profile_name + - name: visibility_timeout + - name: api_timeout ingest_pipeline: ingest/pipeline.yml input: config/cloudtrail.yml diff --git a/x-pack/filebeat/module/aws/elb/config/s3.yml b/x-pack/filebeat/module/aws/elb/config/s3.yml index c4c151708b92..40212723e968 100644 --- a/x-pack/filebeat/module/aws/elb/config/s3.yml +++ b/x-pack/filebeat/module/aws/elb/config/s3.yml @@ -8,3 +8,11 @@ credential_profile_name: {{ .credential_profile_name }} {{ if .shared_credential_file }} shared_credential_file: {{ .shared_credential_file }} {{ end }} + +{{ if .visibility_timeout }} +visibility_timeout: {{ .visibility_timeout }} +{{ end }} + +{{ if .api_timeout }} +api_timeout: {{ .api_timeout }} +{{ end }} diff --git a/x-pack/filebeat/module/aws/elb/manifest.yml b/x-pack/filebeat/module/aws/elb/manifest.yml index ca83ac2a3157..01857d90643d 100644 --- a/x-pack/filebeat/module/aws/elb/manifest.yml +++ b/x-pack/filebeat/module/aws/elb/manifest.yml @@ -5,6 +5,8 @@ var: default: s3 - name: shared_credential_file - name: credential_profile_name + - name: visibility_timeout + - name: api_timeout ingest_pipeline: ingest/pipeline.yml input: config/{{.input}}.yml diff --git a/x-pack/filebeat/module/aws/s3access/config/s3.yml b/x-pack/filebeat/module/aws/s3access/config/s3.yml index c4c151708b92..40212723e968 100644 --- a/x-pack/filebeat/module/aws/s3access/config/s3.yml +++ b/x-pack/filebeat/module/aws/s3access/config/s3.yml @@ -8,3 +8,11 @@ credential_profile_name: {{ .credential_profile_name }} {{ if .shared_credential_file }} shared_credential_file: {{ .shared_credential_file }} {{ end }} + +{{ if .visibility_timeout }} +visibility_timeout: {{ .visibility_timeout }} +{{ end }} + +{{ if .api_timeout }} +api_timeout: {{ .api_timeout }} +{{ end }} diff --git a/x-pack/filebeat/module/aws/s3access/manifest.yml b/x-pack/filebeat/module/aws/s3access/manifest.yml index 20c0ce4efc74..b4fdbb7ca49f 100644 --- a/x-pack/filebeat/module/aws/s3access/manifest.yml +++ b/x-pack/filebeat/module/aws/s3access/manifest.yml @@ -5,6 +5,8 @@ var: default: s3 - name: shared_credential_file - name: credential_profile_name + - name: visibility_timeout + - name: api_timeout ingest_pipeline: ingest/pipeline.yml input: config/{{.input}}.yml diff --git a/x-pack/filebeat/module/aws/vpcflow/config/input.yml b/x-pack/filebeat/module/aws/vpcflow/config/input.yml index 250ce449e555..1838d695eb4a 100644 --- a/x-pack/filebeat/module/aws/vpcflow/config/input.yml +++ b/x-pack/filebeat/module/aws/vpcflow/config/input.yml @@ -11,6 +11,14 @@ credential_profile_name: {{ .credential_profile_name }} shared_credential_file: {{ .shared_credential_file }} {{ end }} +{{ if .visibility_timeout }} +visibility_timeout: {{ .visibility_timeout }} +{{ end }} + +{{ if .api_timeout }} +api_timeout: {{ .api_timeout }} +{{ end }} + {{ else if eq .input "file" }} type: log diff --git a/x-pack/filebeat/module/aws/vpcflow/manifest.yml b/x-pack/filebeat/module/aws/vpcflow/manifest.yml index 9e047a606eb3..e438f1e91c30 100644 --- a/x-pack/filebeat/module/aws/vpcflow/manifest.yml +++ b/x-pack/filebeat/module/aws/vpcflow/manifest.yml @@ -5,6 +5,8 @@ var: default: s3 - name: shared_credential_file - name: credential_profile_name + - name: visibility_timeout + - name: api_timeout ingest_pipeline: ingest/pipeline.yml input: config/input.yml diff --git a/x-pack/filebeat/modules.d/aws.yml.disabled b/x-pack/filebeat/modules.d/aws.yml.disabled index 6bdbafe22056..9c0284b0c642 100644 --- a/x-pack/filebeat/modules.d/aws.yml.disabled +++ b/x-pack/filebeat/modules.d/aws.yml.disabled @@ -17,6 +17,14 @@ # If not set the default profile is used #var.credential_profile_name: fb-aws + # The duration that the received messages are hidden from ReceiveMessage request + # Default to be 300s + #var.visibility_timeout: 300s + + # Maximum duration before AWS API request will be interrupted + # Default to be 120s + #var.api_timeout: 120s + elb: enabled: false @@ -32,6 +40,14 @@ # If not set the default profile is used #var.credential_profile_name: fb-aws + # The duration that the received messages are hidden from ReceiveMessage request + # Default to be 300s + #var.visibility_timeout: 300s + + # Maximum duration before AWS API request will be interrupted + # Default to be 120s + #var.api_timeout: 120s + vpcflow: enabled: false @@ -47,6 +63,14 @@ # If not set the default profile is used #var.credential_profile_name: fb-aws + # The duration that the received messages are hidden from ReceiveMessage request + # Default to be 300s + #var.visibility_timeout: 300s + + # Maximum duration before AWS API request will be interrupted + # Default to be 120s + #var.api_timeout: 120s + cloudtrail: enabled: false @@ -61,3 +85,11 @@ # Profile name for aws credential # If not set the default profile is used #var.credential_profile_name: fb-aws + + # The duration that the received messages are hidden from ReceiveMessage request + # Default to be 300s + #var.visibility_timeout: 300s + + # Maximum duration before AWS API request will be interrupted + # Default to be 120s + #var.api_timeout: 120s