From 445aaf26523283ffc3c9a812e1b8c0e7257cb6dd Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Wed, 15 Jan 2020 09:58:11 -0700 Subject: [PATCH 01/20] Add timeout to GetObjectRequest which will cancel the request if it takes too long --- x-pack/filebeat/input/s3/input.go | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index eef16d918fa..95a043e7e68 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -277,7 +277,7 @@ func (p *s3Input) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs.Mess if err != nil { p.logger.Error(errors.Wrap(err, "change message visibility failed")) } - p.logger.Warnf("Message visibility timeout updated to %v", visibilityTimeout) + p.logger.Infof("Message visibility timeout updated to %v", visibilityTimeout) } else { // When ACK done, message will be deleted. Or when message is // not s3 ObjectCreated event related(handleSQSMessage function @@ -502,17 +502,37 @@ func (p *s3Input) convertJSONToEvent(jsonFields interface{}, offset int, objectH } func (p *s3Input) newS3BucketReader(svc s3iface.ClientAPI, s3Info s3Info) (*bufio.Reader, error) { + // Create a context with a timeout that will abort the download if it takes + // more than the default timeout 1 minute. + timeout := time.Duration(1 * time.Minute) + ctx := context.Background() + + var cancelFn func() + if timeout > 0 { + ctx, cancelFn = context.WithTimeout(ctx, timeout) + } + + // Ensure the context is canceled to prevent leaking. + // See context package for more information, https://golang.org/pkg/context/ + if cancelFn != nil { + defer cancelFn() + } + + // Download the S3 object using GetObjectRequest. The Context will interrupt + // the request if the timeout expires. s3GetObjectInput := &s3.GetObjectInput{ Bucket: awssdk.String(s3Info.name), Key: awssdk.String(s3Info.key), } req := svc.GetObjectRequest(s3GetObjectInput) - resp, err := req.Send(p.context) + resp, err := req.Send(ctx) if err != nil { if awsErr, ok := err.(awserr.Error); ok { + // If the SDK can determine the request or retry delay was canceled + // by a context the ErrCodeRequestCanceled error will be returned. if awsErr.Code() == awssdk.ErrCodeRequestCanceled { - return nil, nil + return nil, errors.Wrapf(err, "GetObject of s3 file with key %v failed due to timeout", s3Info.key) } if awsErr.Code() == "NoSuchKey" { From 90350ff4007995478ba0c8c166c00dbaaf8fff73 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Wed, 15 Jan 2020 10:39:19 -0700 Subject: [PATCH 02/20] Add changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 574116773c1..419a73afc7e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -236,6 +236,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - netflow: Fix bytes/packets counters on some devices (NSEL and Netstream). {pull}15449[15449] - netflow: Fix compatibility with some Cisco devices by changing the field `class_id` from short to long. {pull}15449[15449] - Fixed dashboard for Cisco ASA Firewall. {issue}15420[15420] {pull}15553[15553] +- Fix s3 input hanging with GetObjectRequest API call by adding timeout. {issue}15502[15502] {pull}15590[15590] *Heartbeat* From 269ea66e3d9c08a8861d325da8960251afe28103 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Thu, 16 Jan 2020 07:55:21 -0700 Subject: [PATCH 03/20] Add context_timeout into config for all aws filesets --- .../docs/inputs/input-aws-s3.asciidoc | 12 +++++-- x-pack/filebeat/filebeat.reference.yml | 32 +++++++++++++++++++ x-pack/filebeat/input/s3/config.go | 6 ++++ x-pack/filebeat/input/s3/input.go | 11 ++++--- x-pack/filebeat/module/aws/_meta/config.yml | 32 +++++++++++++++++++ .../aws/cloudtrail/config/cloudtrail.yml | 2 ++ x-pack/filebeat/module/aws/elb/config/s3.yml | 2 ++ .../module/aws/s3access/config/s3.yml | 2 ++ .../module/aws/vpcflow/config/input.yml | 2 ++ x-pack/filebeat/modules.d/aws.yml.disabled | 32 +++++++++++++++++++ 10 files changed, 127 insertions(+), 6 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 4535a8946b0..448e28dbdb3 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -45,9 +45,9 @@ URL of the AWS SQS queue that messages will be received from. Required. [float] ==== `visibility_timeout` -The duration (in seconds) that the received messages are hidden from subsequent +The duration that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request. -This value needs to be a lot bigger than filebeat collection frequency so +This value needs to be a lot bigger than Filebeat collection frequency so if it took too long to read the s3 log, this sqs message will not be reprocessed. The default visibility timeout for a message is 300 seconds. The minimum is 0 seconds. The maximum is 12 hours. @@ -61,6 +61,14 @@ can be assigned the name of the field. This setting will be able to split the messages under the group value into separate events. For example, CloudTrail logs are in JSON format and events are found under the JSON object "Records": +[float] +==== `context_timeout` + +The maximum duration of GetObjectRequest AWS API can take. If it exceeds the +timeout, GetObjectRequest will be interrupted. +The default context timeout for a message is 120 seconds. The minimum +is 0 seconds. The maximum is half of the visibility timeout value. + ["source","json"] ---- { diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 28f911bfa36..0ab507b0942 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -105,6 +105,14 @@ filebeat.modules: # Profile name for aws credential #var.credential_profile_name: fb-aws + # The duration that the received messages are hidden from ReceiveMessage request + # Default to be 300s + #var.visibility_timeout: 300s + + # Maximum duration before GetObject request will be interrupted by context + # Default to be 120s + #var.context_timeout: 120s + elb: enabled: false @@ -114,6 +122,14 @@ filebeat.modules: # Profile name for aws credential #var.credential_profile_name: fb-aws + # The duration that the received messages are hidden from ReceiveMessage request + # Default to be 300s + #var.visibility_timeout: 300s + + # Maximum duration before GetObject request will be interrupted by context + # Default to be 120s + #var.context_timeout: 120s + vpcflow: enabled: false @@ -123,6 +139,14 @@ filebeat.modules: # Profile name for aws credential #var.credential_profile_name: fb-aws + # The duration that the received messages are hidden from ReceiveMessage request + # Default to be 300s + #var.visibility_timeout: 300s + + # Maximum duration before GetObject request will be interrupted by context + # Default to be 120s + #var.context_timeout: 120s + cloudtrail: enabled: false @@ -132,6 +156,14 @@ filebeat.modules: # Profile name for aws credential #var.credential_profile_name: fb-aws + # The duration that the received messages are hidden from ReceiveMessage request + # Default to be 300s + #var.visibility_timeout: 300s + + # Maximum duration before GetObject request will be interrupted by context + # Default to be 120s + #var.context_timeout: 120s + #-------------------------------- Azure Module -------------------------------- - module: azure # All logs diff --git a/x-pack/filebeat/input/s3/config.go b/x-pack/filebeat/input/s3/config.go index 758209c44bf..a7ba643b4e9 100644 --- a/x-pack/filebeat/input/s3/config.go +++ b/x-pack/filebeat/input/s3/config.go @@ -18,6 +18,7 @@ type config struct { VisibilityTimeout time.Duration `config:"visibility_timeout"` AwsConfig awscommon.ConfigAWS `config:",inline"` ExpandEventListFromField string `config:"expand_event_list_from_field"` + ContextTimeout time.Duration `config:"context_timeout"` } func defaultConfig() config { @@ -26,6 +27,7 @@ func defaultConfig() config { Type: "s3", }, VisibilityTimeout: 300 * time.Second, + ContextTimeout: 120 * time.Second, } } @@ -34,5 +36,9 @@ func (c *config) Validate() error { return fmt.Errorf("visibility timeout %v is not within the "+ "required range 0s to 12h", c.VisibilityTimeout) } + if c.ContextTimeout < 0 || c.ContextTimeout > c.VisibilityTimeout/2 { + return fmt.Errorf("context timeout %v needs to be larger than"+ + " 0s and smaller than half of the visibility timeout", c.ContextTimeout) + } return nil } diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index 95a043e7e68..0ddfdc74949 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -171,6 +171,8 @@ func NewInput(cfg *common.Config, connector channel.Connector, context input.Con func (p *s3Input) Run() { p.workerOnce.Do(func() { visibilityTimeout := int64(p.config.VisibilityTimeout.Seconds()) + p.logger.Debugf("visibility timeout is set to %v seconds: ", visibilityTimeout) + regionName, err := getRegionFromQueueURL(p.config.QueueURL) if err != nil { p.logger.Errorf("failed to get region name from queueURL: %v", p.config.QueueURL) @@ -503,13 +505,14 @@ func (p *s3Input) convertJSONToEvent(jsonFields interface{}, offset int, objectH func (p *s3Input) newS3BucketReader(svc s3iface.ClientAPI, s3Info s3Info) (*bufio.Reader, error) { // Create a context with a timeout that will abort the download if it takes - // more than the default timeout 1 minute. - timeout := time.Duration(1 * time.Minute) + // more than the default timeout 2 minute. + contextTimeout := p.config.ContextTimeout + p.logger.Debug("context timeout is set to: ", contextTimeout) ctx := context.Background() var cancelFn func() - if timeout > 0 { - ctx, cancelFn = context.WithTimeout(ctx, timeout) + if contextTimeout > 0 { + ctx, cancelFn = context.WithTimeout(ctx, contextTimeout) } // Ensure the context is canceled to prevent leaking. diff --git a/x-pack/filebeat/module/aws/_meta/config.yml b/x-pack/filebeat/module/aws/_meta/config.yml index 98ab79d69f5..4e23e85ab68 100644 --- a/x-pack/filebeat/module/aws/_meta/config.yml +++ b/x-pack/filebeat/module/aws/_meta/config.yml @@ -8,6 +8,14 @@ # Profile name for aws credential #var.credential_profile_name: fb-aws + # The duration that the received messages are hidden from ReceiveMessage request + # Default to be 300s + #var.visibility_timeout: 300s + + # Maximum duration before GetObject request will be interrupted by context + # Default to be 120s + #var.context_timeout: 120s + elb: enabled: false @@ -17,6 +25,14 @@ # Profile name for aws credential #var.credential_profile_name: fb-aws + # The duration that the received messages are hidden from ReceiveMessage request + # Default to be 300s + #var.visibility_timeout: 300s + + # Maximum duration before GetObject request will be interrupted by context + # Default to be 120s + #var.context_timeout: 120s + vpcflow: enabled: false @@ -26,6 +42,14 @@ # Profile name for aws credential #var.credential_profile_name: fb-aws + # The duration that the received messages are hidden from ReceiveMessage request + # Default to be 300s + #var.visibility_timeout: 300s + + # Maximum duration before GetObject request will be interrupted by context + # Default to be 120s + #var.context_timeout: 120s + cloudtrail: enabled: false @@ -34,3 +58,11 @@ # Profile name for aws credential #var.credential_profile_name: fb-aws + + # The duration that the received messages are hidden from ReceiveMessage request + # Default to be 300s + #var.visibility_timeout: 300s + + # Maximum duration before GetObject request will be interrupted by context + # Default to be 120s + #var.context_timeout: 120s diff --git a/x-pack/filebeat/module/aws/cloudtrail/config/cloudtrail.yml b/x-pack/filebeat/module/aws/cloudtrail/config/cloudtrail.yml index 2b1c3b8551b..b35592c04c8 100644 --- a/x-pack/filebeat/module/aws/cloudtrail/config/cloudtrail.yml +++ b/x-pack/filebeat/module/aws/cloudtrail/config/cloudtrail.yml @@ -4,6 +4,8 @@ type: s3 queue_url: {{ .queue_url }} credential_profile_name: {{ .credential_profile_name }} expand_event_list_from_field: Records +visibility_timeout: {{ .visibility_timeout }} +context_timeout: {{ .context_timeout }} {{ else if eq .input "file" }} diff --git a/x-pack/filebeat/module/aws/elb/config/s3.yml b/x-pack/filebeat/module/aws/elb/config/s3.yml index 4bc46921c20..06ebc80ca68 100644 --- a/x-pack/filebeat/module/aws/elb/config/s3.yml +++ b/x-pack/filebeat/module/aws/elb/config/s3.yml @@ -1,3 +1,5 @@ type: s3 queue_url: {{ .queue_url }} credential_profile_name: {{ .credential_profile_name }} +visibility_timeout: {{ .visibility_timeout }} +context_timeout: {{ .context_timeout }} diff --git a/x-pack/filebeat/module/aws/s3access/config/s3.yml b/x-pack/filebeat/module/aws/s3access/config/s3.yml index 4bc46921c20..06ebc80ca68 100644 --- a/x-pack/filebeat/module/aws/s3access/config/s3.yml +++ b/x-pack/filebeat/module/aws/s3access/config/s3.yml @@ -1,3 +1,5 @@ type: s3 queue_url: {{ .queue_url }} credential_profile_name: {{ .credential_profile_name }} +visibility_timeout: {{ .visibility_timeout }} +context_timeout: {{ .context_timeout }} diff --git a/x-pack/filebeat/module/aws/vpcflow/config/input.yml b/x-pack/filebeat/module/aws/vpcflow/config/input.yml index 432abff6d37..b3a5d6de198 100644 --- a/x-pack/filebeat/module/aws/vpcflow/config/input.yml +++ b/x-pack/filebeat/module/aws/vpcflow/config/input.yml @@ -3,6 +3,8 @@ type: s3 queue_url: {{ .queue_url }} credential_profile_name: {{ .credential_profile_name }} +visibility_timeout: {{ .visibility_timeout }} +context_timeout: {{ .context_timeout }} {{ else if eq .input "file" }} diff --git a/x-pack/filebeat/modules.d/aws.yml.disabled b/x-pack/filebeat/modules.d/aws.yml.disabled index f43bed2eb56..0ac5082a1d8 100644 --- a/x-pack/filebeat/modules.d/aws.yml.disabled +++ b/x-pack/filebeat/modules.d/aws.yml.disabled @@ -11,6 +11,14 @@ # Profile name for aws credential #var.credential_profile_name: fb-aws + # The duration that the received messages are hidden from ReceiveMessage request + # Default to be 300s + #var.visibility_timeout: 300s + + # Maximum duration before GetObject request will be interrupted by context + # Default to be 120s + #var.context_timeout: 120s + elb: enabled: false @@ -20,6 +28,14 @@ # Profile name for aws credential #var.credential_profile_name: fb-aws + # The duration that the received messages are hidden from ReceiveMessage request + # Default to be 300s + #var.visibility_timeout: 300s + + # Maximum duration before GetObject request will be interrupted by context + # Default to be 120s + #var.context_timeout: 120s + vpcflow: enabled: false @@ -29,6 +45,14 @@ # Profile name for aws credential #var.credential_profile_name: fb-aws + # The duration that the received messages are hidden from ReceiveMessage request + # Default to be 300s + #var.visibility_timeout: 300s + + # Maximum duration before GetObject request will be interrupted by context + # Default to be 120s + #var.context_timeout: 120s + cloudtrail: enabled: false @@ -37,3 +61,11 @@ # Profile name for aws credential #var.credential_profile_name: fb-aws + + # The duration that the received messages are hidden from ReceiveMessage request + # Default to be 300s + #var.visibility_timeout: 300s + + # Maximum duration before GetObject request will be interrupted by context + # Default to be 120s + #var.context_timeout: 120s From 32678e7b6560b81a201e512c33266e4c7bbf46bb Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Thu, 16 Jan 2020 08:55:28 -0700 Subject: [PATCH 04/20] fix unit test --- CHANGELOG.next.asciidoc | 2 +- x-pack/filebeat/input/s3/input.go | 2 +- x-pack/filebeat/input/s3/input_test.go | 7 ++++++- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 419a73afc7e..ec233e7319f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -236,7 +236,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - netflow: Fix bytes/packets counters on some devices (NSEL and Netstream). {pull}15449[15449] - netflow: Fix compatibility with some Cisco devices by changing the field `class_id` from short to long. {pull}15449[15449] - Fixed dashboard for Cisco ASA Firewall. {issue}15420[15420] {pull}15553[15553] -- Fix s3 input hanging with GetObjectRequest API call by adding timeout. {issue}15502[15502] {pull}15590[15590] +- Fix s3 input hanging with GetObjectRequest API call by adding context_timeout config. {issue}15502[15502] {pull}15590[15590] *Heartbeat* diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index 0ddfdc74949..22e8cfe1716 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -172,6 +172,7 @@ func (p *s3Input) Run() { p.workerOnce.Do(func() { visibilityTimeout := int64(p.config.VisibilityTimeout.Seconds()) p.logger.Debugf("visibility timeout is set to %v seconds: ", visibilityTimeout) + p.logger.Debugf("context timeout is set to %v: ", p.config.ContextTimeout) regionName, err := getRegionFromQueueURL(p.config.QueueURL) if err != nil { @@ -507,7 +508,6 @@ func (p *s3Input) newS3BucketReader(svc s3iface.ClientAPI, s3Info s3Info) (*bufi // Create a context with a timeout that will abort the download if it takes // more than the default timeout 2 minute. contextTimeout := p.config.ContextTimeout - p.logger.Debug("context timeout is set to: ", contextTimeout) ctx := context.Background() var cancelFn func() diff --git a/x-pack/filebeat/input/s3/input_test.go b/x-pack/filebeat/input/s3/input_test.go index 9290b3f664d..dfd88539cb8 100644 --- a/x-pack/filebeat/input/s3/input_test.go +++ b/x-pack/filebeat/input/s3/input_test.go @@ -11,6 +11,7 @@ import ( "io/ioutil" "net/http" "testing" + "time" awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" @@ -149,7 +150,11 @@ func TestHandleMessage(t *testing.T) { } func TestNewS3BucketReader(t *testing.T) { - p := &s3Input{context: &channelContext{}} + configTest := config{ + ContextTimeout: time.Duration(100 * time.Second), + } + p := &s3Input{context: &channelContext{}, config: configTest} + reader, err := p.newS3BucketReader(mockSvc, info) assert.NoError(t, err) for i := 0; i < 3; i++ { From dee7785738817bbb8470f729d3fe71ff331e334c Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Fri, 17 Jan 2020 13:37:02 -0700 Subject: [PATCH 05/20] Close resp.Body from S3 GetObject API to prevent resource leak --- CHANGELOG.next.asciidoc | 63 ------------------------------- x-pack/filebeat/input/s3/input.go | 6 ++- 2 files changed, 5 insertions(+), 64 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 77188411c9d..d6c013edbad 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -45,69 +45,6 @@ TLS or Beats that accept connections over TLS and validate client certificates. *Filebeat* -- Add support for Cisco syslog format used by their switch. {pull}10760[10760] -- Cover empty request data, url and version in Apache2 module{pull}10730[10730] -- Fix registry entries not being cleaned due to race conditions. {pull}10747[10747] -- Improve detection of file deletion on Windows. {pull}10747[10747] -- Add missing Kubernetes metadata fields to Filebeat CoreDNS module, and fix a documentation error. {pull}11591[11591] -- Reduce memory usage if long lines are truncated to fit `max_bytes` limit. The line buffer is copied into a smaller buffer now. This allows the runtime to release unused memory earlier. {pull}11524[11524] -- Fix memory leak in Filebeat pipeline acker. {pull}12063[12063] -- Fix goroutine leak caused on initialization failures of log input. {pull}12125[12125] -- Fix goroutine leak on non-explicit finalization of log input. {pull}12164[12164] -- Skipping unparsable log entries from docker json reader {pull}12268[12268] -- Parse timezone in PostgreSQL logs as part of the timestamp {pull}12338[12338] -- Load correct pipelines when system module is configured in modules.d. {pull}12340[12340] -- Fix timezone offset parsing in system/syslog. {pull}12529[12529] -- When TLS is configured for the TCP input and a `certificate_authorities` is configured we now default to `required` for the `client_authentication`. {pull}12584[12584] -- Apply `max_message_size` to incoming message buffer. {pull}11966[11966] -- Syslog input will now omit the `process` object from events if it is empty. {pull}12700[12700] -- Fix multiline pattern in Postgres which was too permissive {issue}12078[12078] {pull}13069[13069] -- Allow path variables to be used in files loaded from modules.d. {issue}13184[13184] -- Fix filebeat autodiscover fileset hint for container input. {pull}13296[13296] -- Fix incorrect references to index patterns in AWS and CoreDNS dashboards. {pull}13303[13303] -- Fix timezone parsing of system module ingest pipelines. {pull}13308[13308] -- Fix timezone parsing of elasticsearch module ingest pipelines. {pull}13367[13367] -- Change iis url path grok pattern from URIPATH to NOTSPACE. {issue}12710[12710] {pull}13225[13225] {issue}7951[7951] {pull}13378[13378] {pull}14754[14754] -- Fix timezone parsing of nginx module ingest pipelines. {pull}13369[13369] -- Fix incorrect field references in envoyproxy dashboard {issue}13420[13420] {pull}13421[13421] -- Fixed early expiration of templates (Netflow v9 and IPFIX). {pull}13821[13821] -- Fixed bad handling of sequence numbers when multiple observation domains were exported by a single device (Netflow V9 and IPFIX). {pull}13821[13821] -- Fix timezone parsing of rabbitmq module ingest pipelines. {pull}13879[13879] -- Fix conditions and error checking of date processors in ingest pipelines that use `event.timezone` to parse dates. {pull}13883[13883] -- Fix timezone parsing of Cisco module ingest pipelines. {pull}13893[13893] -- Fix timezone parsing of logstash module ingest pipelines. {pull}13890[13890] -- cisco asa and ftd filesets: Fix parsing of message 106001. {issue}13891[13891] {pull}13903[13903] -- Fix timezone parsing of iptables, mssql and panw module ingest pipelines. {pull}13926[13926] -- Fix merging of fields specified in global scope with fields specified under an input's scope. {issue}3628[3628] {pull}13909[13909] -- Fix delay in enforcing close_renamed and close_removed options. {issue}13488[13488] {pull}13907[13907] -- Fix missing netflow fields in index template. {issue}13768[13768] {pull}13914[13914] -- Fix cisco module's asa and ftd filesets parsing of domain names where an IP address is expected. {issue}14034[14034] -- Fixed increased memory usage with large files when multiline pattern does not match. {issue}14068[14068] -- panw module: Use geo.name instead of geo.country_iso_code for free-form location. {issue}13272[13272] -- Fix azure fields names. {pull}14098[14098] -- Fix calculation of `network.bytes` and `network.packets` for bi-directional netflow events. {pull}14111[14111] -- Accept '-' as http.response.body.bytes in apache module. {pull}14137[14137] -- Fix timezone parsing of MySQL module ingest pipelines. {pull}14130[14130] -- Fix azure filesets test files. {issue}14185[14185] {pull}14235[14235] -- Improve error message in s3 input when handleSQSMessage failed. {pull}14113[14113] -- Close chan of Closer first before calling callback {pull}14231[14231] -- Fix race condition in S3 input plugin. {pull}14359[14359] -- Decode hex values in auditd module. {pull}14471[14471] -- Fix parse of remote addresses that are not IPs in nginx logs. {pull}14505[14505] -- Fix handling multiline log entries in nginx module. {issue}14349[14349] {pull}14499[14499] -- Fix parsing of Elasticsearch node name by `elasticsearch/slowlog` fileset. {pull}14547[14547] -- cisco/asa fileset: Fix parsing of 302021 message code. {pull}14519[14519] -- Fix filebeat azure dashboards, event category should be `Alert`. {pull}14668[14668] -- Update Logstash module's Grok patterns to support Logstash 7.4 logs. {pull}14743[14743] -- Fix a problem in Filebeat input httpjson where interval is not used as time.Duration. {issue}14752[14752] {pull}14753[14753] -- Fix SSL config in input.yml for Filebeat httpjson input in the MISP module. {pull}14767[14767] -- Check content-type when creating new reader in s3 input. {pull}15252[15252] {issue}15225[15225] -- Fix session reset detection and a crash in Netflow input. {pull}14904[14904] -- Handle errors in handleS3Objects function and add more debug messages for s3 input. {pull}15545[15545] -- netflow: Allow for options templates without scope fields. {pull}15449[15449] -- netflow: Fix bytes/packets counters on some devices (NSEL and Netstream). {pull}15449[15449] -- netflow: Fix compatibility with some Cisco devices by changing the field `class_id` from short to long. {pull}15449[15449] -- Fixed dashboard for Cisco ASA Firewall. {issue}15420[15420] {pull}15553[15553] - Fix s3 input hanging with GetObjectRequest API call by adding context_timeout config. {issue}15502[15502] {pull}15590[15590] *Heartbeat* diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index 22e8cfe1716..069d3dbf192 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -201,7 +201,7 @@ func (p *s3Input) run(svcSQS sqsiface.ClientAPI, svcS3 s3iface.ClientAPI, visibi if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == awssdk.ErrCodeRequestCanceled { continue } - p.logger.Error("failed to receive message from SQS:", err) + p.logger.Error("failed to receive message from SQS: ", err) time.Sleep(time.Duration(waitTimeSecond) * time.Second) continue } @@ -546,6 +546,10 @@ func (p *s3Input) newS3BucketReader(svc s3iface.ClientAPI, s3Info s3Info) (*bufi return nil, errors.Wrapf(err, "s3 get object request failed %v", s3Info.key) } + // Make sure to close the body when done with it for S3 GetObject APIs or + // will leak connections. + defer resp.Body.Close() + if resp.Body == nil { return nil, errors.New("s3 get object response body is empty") } From adf908afcc848e2491ebc4aae46a35ac1f28b6b1 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Fri, 17 Jan 2020 15:27:58 -0700 Subject: [PATCH 06/20] fix unit test --- x-pack/filebeat/input/s3/input.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index 069d3dbf192..f13e451e1c3 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -546,14 +546,14 @@ func (p *s3Input) newS3BucketReader(svc s3iface.ClientAPI, s3Info s3Info) (*bufi return nil, errors.Wrapf(err, "s3 get object request failed %v", s3Info.key) } - // Make sure to close the body when done with it for S3 GetObject APIs or - // will leak connections. - defer resp.Body.Close() - if resp.Body == nil { return nil, errors.New("s3 get object response body is empty") } + // Make sure to close the body when done with it for S3 GetObject APIs or + // will leak connections. + defer resp.Body.Close() + // Check content-type if resp.ContentType != nil { switch *resp.ContentType { From b211663d0f3e974c52dc10563251b50c2ea4b872 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Tue, 21 Jan 2020 14:13:47 -0700 Subject: [PATCH 07/20] close resp.Body after getS3ObjectResponse function --- x-pack/filebeat/input/s3/input.go | 99 +++++++++++++++----------- x-pack/filebeat/input/s3/input_test.go | 20 +++--- 2 files changed, 65 insertions(+), 54 deletions(-) diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index f13e451e1c3..966c4bde71f 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -171,8 +171,8 @@ func NewInput(cfg *common.Config, connector channel.Connector, context input.Con func (p *s3Input) Run() { p.workerOnce.Do(func() { visibilityTimeout := int64(p.config.VisibilityTimeout.Seconds()) - p.logger.Debugf("visibility timeout is set to %v seconds: ", visibilityTimeout) - p.logger.Debugf("context timeout is set to %v: ", p.config.ContextTimeout) + p.logger.Infof("visibility timeout is set to %v seconds: ", visibilityTimeout) + p.logger.Infof("context timeout is set to %v: ", p.config.ContextTimeout) regionName, err := getRegionFromQueueURL(p.config.QueueURL) if err != nil { @@ -275,7 +275,7 @@ func (p *s3Input) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs.Mess return case err := <-errC: if err != nil { - p.logger.Warnf("Processing message failed: %v", err) + p.logger.Warn("Processing message failed, updating visibility timeout") err := p.changeVisibilityTimeout(queueURL, visibilityTimeout, svcSQS, message.ReceiptHandle) if err != nil { p.logger.Error(errors.Wrap(err, "change message visibility failed")) @@ -375,15 +375,49 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC objectHash := s3ObjectHash(s3Info) // read from s3 object - reader, err := p.newS3BucketReader(svc, s3Info) + resp, err := p.getS3ObjectResponse(svc, s3Info) if err != nil { err = errors.Wrap(err, "newS3BucketReader failed") + p.logger.Error(err) s3Context.setError(err) return err } - if reader == nil { - continue + if resp == nil { + resp.Body.Close() + return nil + } + + reader := bufio.NewReader(resp.Body) + // Check content-type + if resp.ContentType != nil { + switch *resp.ContentType { + case "application/x-gzip": + gzipReader, err := gzip.NewReader(resp.Body) + if err != nil { + err = errors.Wrapf(err, "Failed to decompress application/x-gzip file %v", s3Info.key) + p.logger.Error(err) + s3Context.setError(err) + resp.Body.Close() + return err + } + reader = bufio.NewReader(gzipReader) + gzipReader.Close() + default: + reader = bufio.NewReader(resp.Body) + } + } else if strings.HasSuffix(s3Info.key, ".gz") { + // If there is no content-type, check file name instead. + gzipReader, err := gzip.NewReader(resp.Body) + if err != nil { + err = errors.Wrapf(err, "Failed to decompress file with .gz suffix %v", s3Info.key) + p.logger.Error(err) + s3Context.setError(err) + resp.Body.Close() + return err + } + reader = bufio.NewReader(gzipReader) + gzipReader.Close() } // Decode JSON documents when expand_event_list_from_field is given in config @@ -392,9 +426,12 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC err := p.decodeJSONWithKey(decoder, objectHash, s3Info, s3Context) if err != nil { err = errors.Wrapf(err, "decodeJSONWithKey failed for %v", s3Info.key) + p.logger.Error(err) s3Context.setError(err) + resp.Body.Close() return err } + resp.Body.Close() return nil } @@ -413,13 +450,18 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC err = p.forwardEvent(event) if err != nil { err = errors.Wrapf(err, "forwardEvent failed for %v", s3Info.key) + p.logger.Error(err) s3Context.setError(err) + resp.Body.Close() return err } + resp.Body.Close() return nil } else if err != nil { err = errors.Wrapf(err, "ReadString failed for %v", s3Info.key) + p.logger.Error(err) s3Context.setError(err) + resp.Body.Close() return err } @@ -429,12 +471,13 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC err = p.forwardEvent(event) if err != nil { err = errors.Wrapf(err, "forwardEvent failed for %v", s3Info.key) + p.logger.Error(err) s3Context.setError(err) + resp.Body.Close() return err } } } - return nil } @@ -504,7 +547,7 @@ func (p *s3Input) convertJSONToEvent(jsonFields interface{}, offset int, objectH return nil } -func (p *s3Input) newS3BucketReader(svc s3iface.ClientAPI, s3Info s3Info) (*bufio.Reader, error) { +func (p *s3Input) getS3ObjectResponse(svc s3iface.ClientAPI, s3Info s3Info) (*s3.GetObjectResponse, error) { // Create a context with a timeout that will abort the download if it takes // more than the default timeout 2 minute. contextTimeout := p.config.ContextTimeout @@ -535,7 +578,9 @@ func (p *s3Input) newS3BucketReader(svc s3iface.ClientAPI, s3Info s3Info) (*bufi // If the SDK can determine the request or retry delay was canceled // by a context the ErrCodeRequestCanceled error will be returned. if awsErr.Code() == awssdk.ErrCodeRequestCanceled { - return nil, errors.Wrapf(err, "GetObject of s3 file with key %v failed due to timeout", s3Info.key) + err = errors.Wrapf(err, "GetObject of s3 file with key %v failed due to timeout", s3Info.key) + p.logger.Error(err) + return nil, err } if awsErr.Code() == "NoSuchKey" { @@ -545,38 +590,7 @@ func (p *s3Input) newS3BucketReader(svc s3iface.ClientAPI, s3Info s3Info) (*bufi } return nil, errors.Wrapf(err, "s3 get object request failed %v", s3Info.key) } - - if resp.Body == nil { - return nil, errors.New("s3 get object response body is empty") - } - - // Make sure to close the body when done with it for S3 GetObject APIs or - // will leak connections. - defer resp.Body.Close() - - // Check content-type - if resp.ContentType != nil { - switch *resp.ContentType { - case "application/x-gzip": - reader, err := gzip.NewReader(resp.Body) - if err != nil { - return nil, errors.Wrapf(err, "Failed to decompress gzipped file %v", s3Info.key) - } - return bufio.NewReader(reader), nil - default: - return bufio.NewReader(resp.Body), nil - } - } - - // If there is no content-type, check file name instead. - if strings.HasSuffix(s3Info.key, ".gz") { - gzipReader, err := gzip.NewReader(resp.Body) - if err != nil { - return nil, errors.Wrapf(err, "Failed to decompress gzipped file %v", s3Info.key) - } - return bufio.NewReader(gzipReader), nil - } - return bufio.NewReader(resp.Body), nil + return resp, nil } func (p *s3Input) forwardEvent(event beat.Event) error { @@ -594,7 +608,8 @@ func (p *s3Input) deleteMessage(queueURL string, messagesReceiptHandle string, s } req := svcSQS.DeleteMessageRequest(deleteMessageInput) - _, err := req.Send(p.context) + ctx := context.Background() + _, err := req.Send(ctx) if err != nil { if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == awssdk.ErrCodeRequestCanceled { return nil diff --git a/x-pack/filebeat/input/s3/input_test.go b/x-pack/filebeat/input/s3/input_test.go index dfd88539cb8..db5a3675d2a 100644 --- a/x-pack/filebeat/input/s3/input_test.go +++ b/x-pack/filebeat/input/s3/input_test.go @@ -5,6 +5,7 @@ package s3 import ( + "bufio" "bytes" "fmt" "io" @@ -13,13 +14,13 @@ import ( "testing" "time" + "github.com/elastic/beats/libbeat/beat" + awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/s3iface" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/stretchr/testify/assert" - - "github.com/elastic/beats/libbeat/beat" ) // MockS3Client struct is used for unit tests. @@ -149,13 +150,14 @@ func TestHandleMessage(t *testing.T) { } -func TestNewS3BucketReader(t *testing.T) { +func TestGetS3ObjectResponse(t *testing.T) { configTest := config{ ContextTimeout: time.Duration(100 * time.Second), } p := &s3Input{context: &channelContext{}, config: configTest} - reader, err := p.newS3BucketReader(mockSvc, info) + resp, err := p.getS3ObjectResponse(mockSvc, info) + reader := bufio.NewReader(resp.Body) assert.NoError(t, err) for i := 0; i < 3; i++ { switch i { @@ -175,13 +177,6 @@ func TestNewS3BucketReader(t *testing.T) { } } -func TestNewS3BucketReaderErr(t *testing.T) { - p := &s3Input{context: &channelContext{}} - reader, err := p.newS3BucketReader(mockSvcErr, info) - assert.Error(t, err, "s3 get object response body is empty") - assert.Nil(t, reader) -} - func TestCreateEvent(t *testing.T) { p := &s3Input{context: &channelContext{}} errC := make(chan error) @@ -199,7 +194,8 @@ func TestCreateEvent(t *testing.T) { } s3ObjectHash := s3ObjectHash(s3Info) - reader, err := p.newS3BucketReader(mockSvc, s3Info) + resp, err := p.getS3ObjectResponse(mockSvc, s3Info) + reader := bufio.NewReader(resp.Body) assert.NoError(t, err) var events []beat.Event for { From c0c511e12884ccc2d5a3830c032034b40fa4666f Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Tue, 21 Jan 2020 16:11:35 -0700 Subject: [PATCH 08/20] use context.WithTimeout for GetObjectRequest --- x-pack/filebeat/input/s3/input.go | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index 966c4bde71f..bd90e075974 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -550,19 +550,8 @@ func (p *s3Input) convertJSONToEvent(jsonFields interface{}, offset int, objectH func (p *s3Input) getS3ObjectResponse(svc s3iface.ClientAPI, s3Info s3Info) (*s3.GetObjectResponse, error) { // Create a context with a timeout that will abort the download if it takes // more than the default timeout 2 minute. - contextTimeout := p.config.ContextTimeout ctx := context.Background() - - var cancelFn func() - if contextTimeout > 0 { - ctx, cancelFn = context.WithTimeout(ctx, contextTimeout) - } - - // Ensure the context is canceled to prevent leaking. - // See context package for more information, https://golang.org/pkg/context/ - if cancelFn != nil { - defer cancelFn() - } + ctx, _ = context.WithTimeout(ctx, p.config.ContextTimeout) // Download the S3 object using GetObjectRequest. The Context will interrupt // the request if the timeout expires. From 48e85f273e52a6841342512ec87db9594bbd9ff8 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Wed, 22 Jan 2020 14:07:35 -0700 Subject: [PATCH 09/20] Add timeout to p.context --- x-pack/filebeat/input/s3/input.go | 67 +++++++++++++------------------ 1 file changed, 27 insertions(+), 40 deletions(-) diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index bd90e075974..21195af3f21 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -64,9 +64,9 @@ type s3Input struct { awsConfig awssdk.Config logger *logp.Logger close chan struct{} - workerOnce sync.Once // Guarantees that the worker goroutine is only started once. - context *channelContext - workerWg sync.WaitGroup // Waits on s3 worker goroutine. + workerOnce sync.Once // Guarantees that the worker goroutine is only started once. + inputCtx context.Context // Wraps the Done channel from parent input.Context. + workerWg sync.WaitGroup // Waits on s3 worker goroutine. stopOnce sync.Once } @@ -107,25 +107,8 @@ type s3Context struct { errC chan error } -// channelContext implements context.Context by wrapping a channel -type channelContext struct { - done <-chan struct{} -} - -func (c *channelContext) Deadline() (time.Time, bool) { return time.Time{}, false } -func (c *channelContext) Done() <-chan struct{} { return c.done } -func (c *channelContext) Err() error { - select { - case <-c.done: - return context.Canceled - default: - return nil - } -} -func (c *channelContext) Value(key interface{}) interface{} { return nil } - // NewInput creates a new s3 input -func NewInput(cfg *common.Config, connector channel.Connector, context input.Context) (input.Input, error) { +func NewInput(cfg *common.Config, connector channel.Connector, inputContext input.Context) (input.Input, error) { cfgwarn.Beta("s3 input type is used") logger := logp.NewLogger(inputName) @@ -136,7 +119,7 @@ func NewInput(cfg *common.Config, connector channel.Connector, context input.Con out, err := connector.ConnectWith(cfg, beat.ClientConfig{ Processing: beat.ProcessingConfig{ - DynamicFields: context.DynamicFields, + DynamicFields: inputContext.DynamicFields, }, ACKEvents: func(privates []interface{}) { for _, private := range privates { @@ -156,13 +139,29 @@ func NewInput(cfg *common.Config, connector channel.Connector, context input.Con } closeChannel := make(chan struct{}) + + // Wrap input.Context's Done channel with a context.Context. This goroutine + // stops with the parent closes the Done channel. + // Create a context with a timeout that will abort the API call if it takes + // more than the default timeout 2 minute. + ctx := context.Background() + ctx, _ = context.WithTimeout(ctx, config.ContextTimeout) + inputCtx, cancelInputCtx := context.WithCancel(ctx) + go func() { + defer cancelInputCtx() + select { + case <-inputContext.Done: + case <-inputCtx.Done(): + } + }() + p := &s3Input{ outlet: out, config: config, awsConfig: awsConfig, logger: logger, close: closeChannel, - context: &channelContext{closeChannel}, + inputCtx: inputCtx, } return p, nil } @@ -194,7 +193,7 @@ func (p *s3Input) run(svcSQS sqsiface.ClientAPI, svcS3 s3iface.ClientAPI, visibi defer p.logger.Infof("s3 input worker for '%v' has stopped.", p.config.QueueURL) p.logger.Infof("s3 input worker has started. with queueURL: %v", p.config.QueueURL) - for p.context.Err() == nil { + for p.inputCtx.Err() == nil { // receive messages from sqs output, err := p.receiveMessage(svcSQS, visibilityTimeout) if err != nil { @@ -221,7 +220,7 @@ func (p *s3Input) Stop() { p.stopOnce.Do(func() { defer p.outlet.Close() close(p.close) - p.context.Done() + p.inputCtx.Done() p.logger.Info("Stopping s3 input") }) } @@ -317,7 +316,7 @@ func (p *s3Input) receiveMessage(svcSQS sqsiface.ClientAPI, visibilityTimeout in WaitTimeSeconds: &waitTimeSecond, }) - return req.Send(p.context) + return req.Send(p.inputCtx) } func (p *s3Input) changeVisibilityTimeout(queueURL string, visibilityTimeout int64, svcSQS sqsiface.ClientAPI, receiptHandle *string) error { @@ -326,7 +325,7 @@ func (p *s3Input) changeVisibilityTimeout(queueURL string, visibilityTimeout int VisibilityTimeout: &visibilityTimeout, ReceiptHandle: receiptHandle, }) - _, err := req.Send(p.context) + _, err := req.Send(p.inputCtx) return err } @@ -383,11 +382,6 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC return err } - if resp == nil { - resp.Body.Close() - return nil - } - reader := bufio.NewReader(resp.Body) // Check content-type if resp.ContentType != nil { @@ -403,8 +397,6 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC } reader = bufio.NewReader(gzipReader) gzipReader.Close() - default: - reader = bufio.NewReader(resp.Body) } } else if strings.HasSuffix(s3Info.key, ".gz") { // If there is no content-type, check file name instead. @@ -548,11 +540,6 @@ func (p *s3Input) convertJSONToEvent(jsonFields interface{}, offset int, objectH } func (p *s3Input) getS3ObjectResponse(svc s3iface.ClientAPI, s3Info s3Info) (*s3.GetObjectResponse, error) { - // Create a context with a timeout that will abort the download if it takes - // more than the default timeout 2 minute. - ctx := context.Background() - ctx, _ = context.WithTimeout(ctx, p.config.ContextTimeout) - // Download the S3 object using GetObjectRequest. The Context will interrupt // the request if the timeout expires. s3GetObjectInput := &s3.GetObjectInput{ @@ -561,7 +548,7 @@ func (p *s3Input) getS3ObjectResponse(svc s3iface.ClientAPI, s3Info s3Info) (*s3 } req := svc.GetObjectRequest(s3GetObjectInput) - resp, err := req.Send(ctx) + resp, err := req.Send(p.inputCtx) if err != nil { if awsErr, ok := err.(awserr.Error); ok { // If the SDK can determine the request or retry delay was canceled From bb5201f049766390863eb08fe12697cca888c460 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Wed, 22 Jan 2020 15:44:07 -0700 Subject: [PATCH 10/20] Fix unit test for s3 input --- x-pack/filebeat/input/s3/input_test.go | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/x-pack/filebeat/input/s3/input_test.go b/x-pack/filebeat/input/s3/input_test.go index db5a3675d2a..169859129be 100644 --- a/x-pack/filebeat/input/s3/input_test.go +++ b/x-pack/filebeat/input/s3/input_test.go @@ -7,20 +7,20 @@ package s3 import ( "bufio" "bytes" + "context" "fmt" "io" "io/ioutil" "net/http" "testing" - "time" - - "github.com/elastic/beats/libbeat/beat" awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/s3iface" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/beat" ) // MockS3Client struct is used for unit tests. @@ -151,11 +151,7 @@ func TestHandleMessage(t *testing.T) { } func TestGetS3ObjectResponse(t *testing.T) { - configTest := config{ - ContextTimeout: time.Duration(100 * time.Second), - } - p := &s3Input{context: &channelContext{}, config: configTest} - + p := &s3Input{inputCtx: context.Background()} resp, err := p.getS3ObjectResponse(mockSvc, info) reader := bufio.NewReader(resp.Body) assert.NoError(t, err) @@ -178,7 +174,7 @@ func TestGetS3ObjectResponse(t *testing.T) { } func TestCreateEvent(t *testing.T) { - p := &s3Input{context: &channelContext{}} + p := &s3Input{inputCtx: context.Background()} errC := make(chan error) s3Context := &s3Context{ refs: 1, From 1a09c7456b81632fe2c63a999a4bb856665c41ed Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Thu, 23 Jan 2020 07:15:15 -0700 Subject: [PATCH 11/20] Add cancelFn for context.WithTimeout --- x-pack/filebeat/input/s3/input.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index 21195af3f21..c1baf4dd817 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -145,7 +145,10 @@ func NewInput(cfg *common.Config, connector channel.Connector, inputContext inpu // Create a context with a timeout that will abort the API call if it takes // more than the default timeout 2 minute. ctx := context.Background() - ctx, _ = context.WithTimeout(ctx, config.ContextTimeout) + var cancelFn func() + ctx, cancelFn = context.WithTimeout(ctx, config.ContextTimeout) + defer cancelFn() + inputCtx, cancelInputCtx := context.WithCancel(ctx) go func() { defer cancelInputCtx() From a1e4e188145283cf8f32c698610d401d5377d943 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Thu, 23 Jan 2020 12:40:03 -0700 Subject: [PATCH 12/20] Change context_timeout to aws_api_timeout --- x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc | 8 ++++---- x-pack/filebeat/input/s3/config.go | 10 +++++----- x-pack/filebeat/input/s3/input.go | 4 ++-- .../module/aws/cloudtrail/config/cloudtrail.yml | 9 +++++++-- x-pack/filebeat/module/aws/elb/config/s3.yml | 4 ++-- x-pack/filebeat/module/aws/s3access/config/s3.yml | 4 ++-- x-pack/filebeat/module/aws/vpcflow/config/input.yml | 4 ++-- 7 files changed, 24 insertions(+), 19 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 448e28dbdb3..c2d08fd3850 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -62,11 +62,11 @@ messages under the group value into separate events. For example, CloudTrail log are in JSON format and events are found under the JSON object "Records": [float] -==== `context_timeout` +==== `aws_api_timeout` -The maximum duration of GetObjectRequest AWS API can take. If it exceeds the -timeout, GetObjectRequest will be interrupted. -The default context timeout for a message is 120 seconds. The minimum +The maximum duration of AWS API can take. If it exceeds the timeout, AWS API +will be interrupted. +The default AWS API timeout for a message is 120 seconds. The minimum is 0 seconds. The maximum is half of the visibility timeout value. ["source","json"] diff --git a/x-pack/filebeat/input/s3/config.go b/x-pack/filebeat/input/s3/config.go index a7ba643b4e9..c4827c9a6e6 100644 --- a/x-pack/filebeat/input/s3/config.go +++ b/x-pack/filebeat/input/s3/config.go @@ -18,7 +18,7 @@ type config struct { VisibilityTimeout time.Duration `config:"visibility_timeout"` AwsConfig awscommon.ConfigAWS `config:",inline"` ExpandEventListFromField string `config:"expand_event_list_from_field"` - ContextTimeout time.Duration `config:"context_timeout"` + AwsApiTimeout time.Duration `config:"aw_api_timeout"` } func defaultConfig() config { @@ -27,7 +27,7 @@ func defaultConfig() config { Type: "s3", }, VisibilityTimeout: 300 * time.Second, - ContextTimeout: 120 * time.Second, + AwsApiTimeout: 120 * time.Second, } } @@ -36,9 +36,9 @@ func (c *config) Validate() error { return fmt.Errorf("visibility timeout %v is not within the "+ "required range 0s to 12h", c.VisibilityTimeout) } - if c.ContextTimeout < 0 || c.ContextTimeout > c.VisibilityTimeout/2 { - return fmt.Errorf("context timeout %v needs to be larger than"+ - " 0s and smaller than half of the visibility timeout", c.ContextTimeout) + if c.AwsApiTimeout < 0 || c.AwsApiTimeout > c.VisibilityTimeout/2 { + return fmt.Errorf("aws api timeout %v needs to be larger than"+ + " 0s and smaller than half of the visibility timeout", c.AwsApiTimeout) } return nil } diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index c1baf4dd817..03415faaac4 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -146,7 +146,7 @@ func NewInput(cfg *common.Config, connector channel.Connector, inputContext inpu // more than the default timeout 2 minute. ctx := context.Background() var cancelFn func() - ctx, cancelFn = context.WithTimeout(ctx, config.ContextTimeout) + ctx, cancelFn = context.WithTimeout(ctx, config.AwsApiTimeout) defer cancelFn() inputCtx, cancelInputCtx := context.WithCancel(ctx) @@ -174,7 +174,7 @@ func (p *s3Input) Run() { p.workerOnce.Do(func() { visibilityTimeout := int64(p.config.VisibilityTimeout.Seconds()) p.logger.Infof("visibility timeout is set to %v seconds: ", visibilityTimeout) - p.logger.Infof("context timeout is set to %v: ", p.config.ContextTimeout) + p.logger.Infof("aws api timeout is set to %v: ", p.config.AwsApiTimeout) regionName, err := getRegionFromQueueURL(p.config.QueueURL) if err != nil { diff --git a/x-pack/filebeat/module/aws/cloudtrail/config/cloudtrail.yml b/x-pack/filebeat/module/aws/cloudtrail/config/cloudtrail.yml index e021dac956e..766b93d0427 100644 --- a/x-pack/filebeat/module/aws/cloudtrail/config/cloudtrail.yml +++ b/x-pack/filebeat/module/aws/cloudtrail/config/cloudtrail.yml @@ -3,8 +3,6 @@ type: s3 queue_url: {{ .queue_url }} expand_event_list_from_field: Records -visibility_timeout: {{ .visibility_timeout }} -context_timeout: {{ .context_timeout }} {{ if .credential_profile_name }} credential_profile_name: {{ .credential_profile_name }} @@ -14,6 +12,13 @@ credential_profile_name: {{ .credential_profile_name }} shared_credential_file: {{ .shared_credential_file }} {{ end }} +{{ if .visibility_timeout }} +visibility_timeout: {{ .visibility_timeout }} +{{ end }} + +{{ if .aws_api_timeout }} +aws_api_timeout: {{ .aws_api_timeout }} +{{ end }} {{ else if eq .input "file" }} diff --git a/x-pack/filebeat/module/aws/elb/config/s3.yml b/x-pack/filebeat/module/aws/elb/config/s3.yml index 4d988844cd4..f9672ad185b 100644 --- a/x-pack/filebeat/module/aws/elb/config/s3.yml +++ b/x-pack/filebeat/module/aws/elb/config/s3.yml @@ -13,6 +13,6 @@ shared_credential_file: {{ .shared_credential_file }} visibility_timeout: {{ .visibility_timeout }} {{ end }} -{{ if .context_timeout }} -context_timeout: {{ .context_timeout }} +{{ if .aws_api_timeout }} +aws_api_timeout: {{ .aws_api_timeout }} {{ end }} diff --git a/x-pack/filebeat/module/aws/s3access/config/s3.yml b/x-pack/filebeat/module/aws/s3access/config/s3.yml index 4d988844cd4..f9672ad185b 100644 --- a/x-pack/filebeat/module/aws/s3access/config/s3.yml +++ b/x-pack/filebeat/module/aws/s3access/config/s3.yml @@ -13,6 +13,6 @@ shared_credential_file: {{ .shared_credential_file }} visibility_timeout: {{ .visibility_timeout }} {{ end }} -{{ if .context_timeout }} -context_timeout: {{ .context_timeout }} +{{ if .aws_api_timeout }} +aws_api_timeout: {{ .aws_api_timeout }} {{ end }} diff --git a/x-pack/filebeat/module/aws/vpcflow/config/input.yml b/x-pack/filebeat/module/aws/vpcflow/config/input.yml index 0eeb0eb9a64..6264db44081 100644 --- a/x-pack/filebeat/module/aws/vpcflow/config/input.yml +++ b/x-pack/filebeat/module/aws/vpcflow/config/input.yml @@ -15,8 +15,8 @@ shared_credential_file: {{ .shared_credential_file }} visibility_timeout: {{ .visibility_timeout }} {{ end }} -{{ if .context_timeout }} -context_timeout: {{ .context_timeout }} +{{ if .aws_api_timeout }} +aws_api_timeout: {{ .aws_api_timeout }} {{ end }} {{ else if eq .input "file" }} From fb07d963a940e289e946c6f9ea5988afbc0accca Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Thu, 23 Jan 2020 16:23:14 -0700 Subject: [PATCH 13/20] create separate ctx with timeout for each request --- x-pack/filebeat/input/s3/config.go | 4 +- x-pack/filebeat/input/s3/input.go | 242 ++++++++++++------------- x-pack/filebeat/input/s3/input_test.go | 24 +-- 3 files changed, 124 insertions(+), 146 deletions(-) diff --git a/x-pack/filebeat/input/s3/config.go b/x-pack/filebeat/input/s3/config.go index c4827c9a6e6..fa672ab90d1 100644 --- a/x-pack/filebeat/input/s3/config.go +++ b/x-pack/filebeat/input/s3/config.go @@ -18,7 +18,7 @@ type config struct { VisibilityTimeout time.Duration `config:"visibility_timeout"` AwsConfig awscommon.ConfigAWS `config:",inline"` ExpandEventListFromField string `config:"expand_event_list_from_field"` - AwsApiTimeout time.Duration `config:"aw_api_timeout"` + AwsApiTimeout time.Duration `config:"aws_api_timeout"` } func defaultConfig() config { @@ -27,7 +27,7 @@ func defaultConfig() config { Type: "s3", }, VisibilityTimeout: 300 * time.Second, - AwsApiTimeout: 120 * time.Second, + AwsApiTimeout: 120 * time.Second, } } diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index 03415faaac4..03b8a4d2669 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -142,14 +142,7 @@ func NewInput(cfg *common.Config, connector channel.Connector, inputContext inpu // Wrap input.Context's Done channel with a context.Context. This goroutine // stops with the parent closes the Done channel. - // Create a context with a timeout that will abort the API call if it takes - // more than the default timeout 2 minute. - ctx := context.Background() - var cancelFn func() - ctx, cancelFn = context.WithTimeout(ctx, config.AwsApiTimeout) - defer cancelFn() - - inputCtx, cancelInputCtx := context.WithCancel(ctx) + inputCtx, cancelInputCtx := context.WithCancel(context.Background()) go func() { defer cancelInputCtx() select { @@ -223,7 +216,6 @@ func (p *s3Input) Stop() { p.stopOnce.Do(func() { defer p.outlet.Close() close(p.close) - p.inputCtx.Done() p.logger.Info("Stopping s3 input") }) } @@ -319,7 +311,12 @@ func (p *s3Input) receiveMessage(svcSQS sqsiface.ClientAPI, visibilityTimeout in WaitTimeSeconds: &waitTimeSecond, }) - return req.Send(p.inputCtx) + // The Context will interrupt the request if the timeout expires. + var cancelFn func() + ctx, cancelFn := context.WithTimeout(p.inputCtx, p.config.AwsApiTimeout) + defer cancelFn() + + return req.Send(ctx) } func (p *s3Input) changeVisibilityTimeout(queueURL string, visibilityTimeout int64, svcSQS sqsiface.ClientAPI, receiptHandle *string) error { @@ -328,7 +325,13 @@ func (p *s3Input) changeVisibilityTimeout(queueURL string, visibilityTimeout int VisibilityTimeout: &visibilityTimeout, ReceiptHandle: receiptHandle, }) - _, err := req.Send(p.inputCtx) + + // The Context will interrupt the request if the timeout expires. + var cancelFn func() + ctx, cancelFn := context.WithTimeout(p.inputCtx, p.config.AwsApiTimeout) + defer cancelFn() + + _, err := req.Send(ctx) return err } @@ -367,116 +370,85 @@ func handleSQSMessage(m sqs.Message) ([]s3Info, error) { } func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC chan error) error { - s3Context := &s3Context{ + s3Ctx := &s3Context{ refs: 1, errC: errC, } - defer s3Context.done() + defer s3Ctx.done() + + for _, info := range s3Infos { + err := p.createEventsFromS3Info(svc, info, s3Ctx) + if err != nil { + err = errors.Wrapf(err, "createEventsFromS3Info failed for %v", info.key) + p.logger.Error(err) + s3Ctx.setError(err) + } + } + return nil +} + +func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3Ctx *s3Context) error { + objectHash := s3ObjectHash(info) - for _, s3Info := range s3Infos { - objectHash := s3ObjectHash(s3Info) + // read from s3 object + reader, err := p.newS3BucketReader(svc, info) + if err != nil { + err = errors.Wrap(err, "getS3ObjectResponse failed") + p.logger.Error(err) + return err + } - // read from s3 object - resp, err := p.getS3ObjectResponse(svc, s3Info) + // Decode JSON documents when expand_event_list_from_field is given in config + if p.config.ExpandEventListFromField != "" { + decoder := json.NewDecoder(reader) + err := p.decodeJSONWithKey(decoder, objectHash, info, s3Ctx) if err != nil { - err = errors.Wrap(err, "newS3BucketReader failed") + err = errors.Wrap(err, "decodeJSONWithKey failed") p.logger.Error(err) - s3Context.setError(err) return err } + return nil + } - reader := bufio.NewReader(resp.Body) - // Check content-type - if resp.ContentType != nil { - switch *resp.ContentType { - case "application/x-gzip": - gzipReader, err := gzip.NewReader(resp.Body) - if err != nil { - err = errors.Wrapf(err, "Failed to decompress application/x-gzip file %v", s3Info.key) - p.logger.Error(err) - s3Context.setError(err) - resp.Body.Close() - return err - } - reader = bufio.NewReader(gzipReader) - gzipReader.Close() - } - } else if strings.HasSuffix(s3Info.key, ".gz") { - // If there is no content-type, check file name instead. - gzipReader, err := gzip.NewReader(resp.Body) - if err != nil { - err = errors.Wrapf(err, "Failed to decompress file with .gz suffix %v", s3Info.key) - p.logger.Error(err) - s3Context.setError(err) - resp.Body.Close() - return err - } - reader = bufio.NewReader(gzipReader) - gzipReader.Close() + // handle s3 objects that are not json content-type + offset := 0 + for { + log, err := reader.ReadString('\n') + if log == "" { + break } - // Decode JSON documents when expand_event_list_from_field is given in config - if p.config.ExpandEventListFromField != "" { - decoder := json.NewDecoder(reader) - err := p.decodeJSONWithKey(decoder, objectHash, s3Info, s3Context) + if err == io.EOF { + // create event for last line + offset += len([]byte(log)) + event := createEvent(log, offset, info, objectHash, s3Ctx) + err = p.forwardEvent(event) if err != nil { - err = errors.Wrapf(err, "decodeJSONWithKey failed for %v", s3Info.key) + err = errors.Wrap(err, "forwardEvent failed") p.logger.Error(err) - s3Context.setError(err) - resp.Body.Close() return err } - resp.Body.Close() return nil + } else if err != nil { + err = errors.Wrap(err, "ReadString failed") + p.logger.Error(err) + return err } - // handle s3 objects that are not json content-type - offset := 0 - for { - log, err := reader.ReadString('\n') - if log == "" { - break - } - - if err == io.EOF { - // create event for last line - offset += len([]byte(log)) - event := createEvent(log, offset, s3Info, objectHash, s3Context) - err = p.forwardEvent(event) - if err != nil { - err = errors.Wrapf(err, "forwardEvent failed for %v", s3Info.key) - p.logger.Error(err) - s3Context.setError(err) - resp.Body.Close() - return err - } - resp.Body.Close() - return nil - } else if err != nil { - err = errors.Wrapf(err, "ReadString failed for %v", s3Info.key) - p.logger.Error(err) - s3Context.setError(err) - resp.Body.Close() - return err - } - - // create event per log line - offset += len([]byte(log)) - event := createEvent(log, offset, s3Info, objectHash, s3Context) - err = p.forwardEvent(event) - if err != nil { - err = errors.Wrapf(err, "forwardEvent failed for %v", s3Info.key) - p.logger.Error(err) - s3Context.setError(err) - resp.Body.Close() - return err - } + // create event per log line + offset += len([]byte(log)) + event := createEvent(log, offset, info, objectHash, s3Ctx) + err = p.forwardEvent(event) + if err != nil { + err = errors.Wrap(err, "forwardEvent failed") + p.logger.Error(err) + return err } } return nil } -func (p *s3Input) decodeJSONWithKey(decoder *json.Decoder, objectHash string, s3Info s3Info, s3Context *s3Context) error { +func (p *s3Input) decodeJSONWithKey(decoder *json.Decoder, objectHash string, s3Info s3Info, s3Ctx *s3Context) error { offset := 0 for { var jsonFields map[string][]interface{} @@ -490,15 +462,15 @@ func (p *s3Input) decodeJSONWithKey(decoder *json.Decoder, objectHash string, s3 // get logs from expand_event_list_from_field textValues, ok := jsonFields[p.config.ExpandEventListFromField] if !ok { - err = errors.Wrapf(err, fmt.Sprintf("Key '%s' not found", p.config.ExpandEventListFromField)) + err = errors.Wrapf(err, fmt.Sprintf("key '%s' not found", p.config.ExpandEventListFromField)) p.logger.Error(err) return err } for _, v := range textValues { - err := p.convertJSONToEvent(v, offset, objectHash, s3Info, s3Context) + err := p.convertJSONToEvent(v, offset, objectHash, s3Info, s3Ctx) if err != nil { - err = errors.Wrapf(err, fmt.Sprintf("convertJSONToEvent failed for %v", s3Info.key)) + err = errors.Wrap(err, "convertJSONToEvent failed") p.logger.Error(err) return err } @@ -517,7 +489,7 @@ func (p *s3Input) decodeJSONWithKey(decoder *json.Decoder, objectHash string, s3 } for _, v := range textValues { - err := p.convertJSONToEvent(v, offset, objectHash, s3Info, s3Context) + err := p.convertJSONToEvent(v, offset, objectHash, s3Info, s3Ctx) if err != nil { err = errors.Wrapf(err, fmt.Sprintf("Key '%s' not found", p.config.ExpandEventListFromField)) p.logger.Error(err) @@ -527,49 +499,68 @@ func (p *s3Input) decodeJSONWithKey(decoder *json.Decoder, objectHash string, s3 } } -func (p *s3Input) convertJSONToEvent(jsonFields interface{}, offset int, objectHash string, s3Info s3Info, s3Context *s3Context) error { +func (p *s3Input) convertJSONToEvent(jsonFields interface{}, offset int, objectHash string, s3Info s3Info, s3Ctx *s3Context) error { vJSON, err := json.Marshal(jsonFields) log := string(vJSON) offset += len([]byte(log)) - event := createEvent(log, offset, s3Info, objectHash, s3Context) + event := createEvent(log, offset, s3Info, objectHash, s3Ctx) err = p.forwardEvent(event) if err != nil { - err = errors.Wrapf(err, fmt.Sprintf("forwardEvent failed for %s", s3Info.key)) + err = errors.Wrap(err, fmt.Sprintf("forwardEvent failed")) p.logger.Error(err) return err } return nil } -func (p *s3Input) getS3ObjectResponse(svc s3iface.ClientAPI, s3Info s3Info) (*s3.GetObjectResponse, error) { - // Download the S3 object using GetObjectRequest. The Context will interrupt - // the request if the timeout expires. +func (p *s3Input) newS3BucketReader(svc s3iface.ClientAPI, info s3Info) (*bufio.Reader, error) { + // Download the S3 object using GetObjectRequest. s3GetObjectInput := &s3.GetObjectInput{ - Bucket: awssdk.String(s3Info.name), - Key: awssdk.String(s3Info.key), + Bucket: awssdk.String(info.name), + Key: awssdk.String(info.key), } req := svc.GetObjectRequest(s3GetObjectInput) - resp, err := req.Send(p.inputCtx) + // The Context will interrupt the request if the timeout expires. + var cancelFn func() + ctx, cancelFn := context.WithTimeout(p.inputCtx, p.config.AwsApiTimeout) + defer cancelFn() + + resp, err := req.Send(ctx) if err != nil { if awsErr, ok := err.(awserr.Error); ok { // If the SDK can determine the request or retry delay was canceled // by a context the ErrCodeRequestCanceled error will be returned. if awsErr.Code() == awssdk.ErrCodeRequestCanceled { - err = errors.Wrapf(err, "GetObject of s3 file with key %v failed due to timeout", s3Info.key) + err = errors.Wrap(err, "GetObject request canceled") p.logger.Error(err) return nil, err } if awsErr.Code() == "NoSuchKey" { - p.logger.Warn("Cannot find s3 file with key ", s3Info.key) + p.logger.Warn("Cannot find s3 file") return nil, nil } } - return nil, errors.Wrapf(err, "s3 get object request failed %v", s3Info.key) + return nil, errors.Wrap(err, "s3 get object request failed") + } + + defer resp.Body.Close() + + reader := bufio.NewReader(resp.Body) + // Check content-type + if (resp.ContentType != nil && *resp.ContentType == "application/x-gzip") || strings.HasSuffix(info.key, ".gz") { + gzipReader, err := gzip.NewReader(resp.Body) + if err != nil { + err = errors.Wrap(err, "gzip.NewReader failed") + p.logger.Error(err) + return nil, err + } + reader = bufio.NewReader(gzipReader) + gzipReader.Close() } - return resp, nil + return reader, nil } func (p *s3Input) forwardEvent(event beat.Event) error { @@ -587,7 +578,12 @@ func (p *s3Input) deleteMessage(queueURL string, messagesReceiptHandle string, s } req := svcSQS.DeleteMessageRequest(deleteMessageInput) - ctx := context.Background() + + // The Context will interrupt the request if the timeout expires. + var cancelFn func() + ctx, cancelFn := context.WithTimeout(p.inputCtx, p.config.AwsApiTimeout) + defer cancelFn() + _, err := req.Send(ctx) if err != nil { if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == awssdk.ErrCodeRequestCanceled { @@ -598,33 +594,33 @@ func (p *s3Input) deleteMessage(queueURL string, messagesReceiptHandle string, s return nil } -func createEvent(log string, offset int, s3Info s3Info, objectHash string, s3Context *s3Context) beat.Event { +func createEvent(log string, offset int, info s3Info, objectHash string, s3Ctx *s3Context) beat.Event { f := common.MapStr{ "message": log, "log": common.MapStr{ "offset": int64(offset), - "file.path": constructObjectURL(s3Info), + "file.path": constructObjectURL(info), }, "aws": common.MapStr{ "s3": common.MapStr{ "bucket": common.MapStr{ - "name": s3Info.name, - "arn": s3Info.arn}, - "object.key": s3Info.key, + "name": info.name, + "arn": info.arn}, + "object.key": info.key, }, }, "cloud": common.MapStr{ "provider": "aws", - "region": s3Info.region, + "region": info.region, }, } - s3Context.Inc() + s3Ctx.Inc() return beat.Event{ Timestamp: time.Now(), Fields: f, Meta: common.MapStr{"id": objectHash + "-" + fmt.Sprintf("%012d", offset)}, - Private: s3Context, + Private: s3Ctx, } } diff --git a/x-pack/filebeat/input/s3/input_test.go b/x-pack/filebeat/input/s3/input_test.go index 169859129be..d8b374537a3 100644 --- a/x-pack/filebeat/input/s3/input_test.go +++ b/x-pack/filebeat/input/s3/input_test.go @@ -5,7 +5,6 @@ package s3 import ( - "bufio" "bytes" "context" "fmt" @@ -28,16 +27,10 @@ type MockS3Client struct { s3iface.ClientAPI } -// MockS3ClientErr struct is used for unit tests. -type MockS3ClientErr struct { - s3iface.ClientAPI -} - var ( s3LogString1 = "36c1f test-s3-ks [20/Jun/2019] 1.2.3.4 arn:aws:iam::1234:user/test@elastic.co 5141F REST.HEAD.OBJECT Screen1.png \n" s3LogString2 = "28kdg test-s3-ks [20/Jun/2019] 1.2.3.4 arn:aws:iam::1234:user/test@elastic.co 5A070 REST.HEAD.OBJECT Screen2.png \n" mockSvc = &MockS3Client{} - mockSvcErr = &MockS3ClientErr{} info = s3Info{ name: "test-s3-ks", key: "log2019-06-21-16-16-54", @@ -58,16 +51,6 @@ func (m *MockS3Client) GetObjectRequest(input *s3.GetObjectInput) s3.GetObjectRe } } -func (m *MockS3ClientErr) GetObjectRequest(input *s3.GetObjectInput) s3.GetObjectRequest { - httpReq, _ := http.NewRequest("", "", nil) - return s3.GetObjectRequest{ - Request: &awssdk.Request{ - Data: &s3.GetObjectOutput{}, - HTTPRequest: httpReq, - }, - } -} - func TestGetRegionFromQueueURL(t *testing.T) { queueURL := "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs" regionName, err := getRegionFromQueueURL(queueURL) @@ -152,8 +135,7 @@ func TestHandleMessage(t *testing.T) { func TestGetS3ObjectResponse(t *testing.T) { p := &s3Input{inputCtx: context.Background()} - resp, err := p.getS3ObjectResponse(mockSvc, info) - reader := bufio.NewReader(resp.Body) + reader, err := p.newS3BucketReader(mockSvc, info) assert.NoError(t, err) for i := 0; i < 3; i++ { switch i { @@ -190,8 +172,8 @@ func TestCreateEvent(t *testing.T) { } s3ObjectHash := s3ObjectHash(s3Info) - resp, err := p.getS3ObjectResponse(mockSvc, s3Info) - reader := bufio.NewReader(resp.Body) + reader, err := p.newS3BucketReader(mockSvc, s3Info) + assert.NoError(t, err) var events []beat.Event for { From 1c63a8f755c1c010c9cdd0c5d43d7fe6ec92b6a2 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Thu, 23 Jan 2020 16:30:08 -0700 Subject: [PATCH 14/20] Change AwsApiTimeout to AwsAPITimeout --- x-pack/filebeat/input/s3/config.go | 8 ++++---- x-pack/filebeat/input/s3/input.go | 2 +- x-pack/filebeat/input/s3/input_test.go | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/x-pack/filebeat/input/s3/config.go b/x-pack/filebeat/input/s3/config.go index fa672ab90d1..dab77ceaecb 100644 --- a/x-pack/filebeat/input/s3/config.go +++ b/x-pack/filebeat/input/s3/config.go @@ -18,7 +18,7 @@ type config struct { VisibilityTimeout time.Duration `config:"visibility_timeout"` AwsConfig awscommon.ConfigAWS `config:",inline"` ExpandEventListFromField string `config:"expand_event_list_from_field"` - AwsApiTimeout time.Duration `config:"aws_api_timeout"` + AwsAPITimeout time.Duration `config:"aws_api_timeout"` } func defaultConfig() config { @@ -27,7 +27,7 @@ func defaultConfig() config { Type: "s3", }, VisibilityTimeout: 300 * time.Second, - AwsApiTimeout: 120 * time.Second, + AwsAPITimeout: 120 * time.Second, } } @@ -36,9 +36,9 @@ func (c *config) Validate() error { return fmt.Errorf("visibility timeout %v is not within the "+ "required range 0s to 12h", c.VisibilityTimeout) } - if c.AwsApiTimeout < 0 || c.AwsApiTimeout > c.VisibilityTimeout/2 { + if c.AwsAPITimeout < 0 || c.AwsAPITimeout > c.VisibilityTimeout/2 { return fmt.Errorf("aws api timeout %v needs to be larger than"+ - " 0s and smaller than half of the visibility timeout", c.AwsApiTimeout) + " 0s and smaller than half of the visibility timeout", c.AwsAPITimeout) } return nil } diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index 03b8a4d2669..499d14580fb 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -167,7 +167,7 @@ func (p *s3Input) Run() { p.workerOnce.Do(func() { visibilityTimeout := int64(p.config.VisibilityTimeout.Seconds()) p.logger.Infof("visibility timeout is set to %v seconds: ", visibilityTimeout) - p.logger.Infof("aws api timeout is set to %v: ", p.config.AwsApiTimeout) + p.logger.Infof("aws api timeout is set to %v: ", p.config.AwsAPITimeout) regionName, err := getRegionFromQueueURL(p.config.QueueURL) if err != nil { diff --git a/x-pack/filebeat/input/s3/input_test.go b/x-pack/filebeat/input/s3/input_test.go index d8b374537a3..4b37a78ab63 100644 --- a/x-pack/filebeat/input/s3/input_test.go +++ b/x-pack/filebeat/input/s3/input_test.go @@ -133,7 +133,7 @@ func TestHandleMessage(t *testing.T) { } -func TestGetS3ObjectResponse(t *testing.T) { +func TestNewS3BucketReader(t *testing.T) { p := &s3Input{inputCtx: context.Background()} reader, err := p.newS3BucketReader(mockSvc, info) assert.NoError(t, err) From 47b6fbe77cea2060de9d1232ed26e7427800a462 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Thu, 23 Jan 2020 16:33:35 -0700 Subject: [PATCH 15/20] fix typo --- x-pack/filebeat/input/s3/input.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index 499d14580fb..2984e2e4e94 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -313,7 +313,7 @@ func (p *s3Input) receiveMessage(svcSQS sqsiface.ClientAPI, visibilityTimeout in // The Context will interrupt the request if the timeout expires. var cancelFn func() - ctx, cancelFn := context.WithTimeout(p.inputCtx, p.config.AwsApiTimeout) + ctx, cancelFn := context.WithTimeout(p.inputCtx, p.config.AwsAPITimeout) defer cancelFn() return req.Send(ctx) @@ -328,7 +328,7 @@ func (p *s3Input) changeVisibilityTimeout(queueURL string, visibilityTimeout int // The Context will interrupt the request if the timeout expires. var cancelFn func() - ctx, cancelFn := context.WithTimeout(p.inputCtx, p.config.AwsApiTimeout) + ctx, cancelFn := context.WithTimeout(p.inputCtx, p.config.AwsAPITimeout) defer cancelFn() _, err := req.Send(ctx) @@ -524,7 +524,7 @@ func (p *s3Input) newS3BucketReader(svc s3iface.ClientAPI, info s3Info) (*bufio. // The Context will interrupt the request if the timeout expires. var cancelFn func() - ctx, cancelFn := context.WithTimeout(p.inputCtx, p.config.AwsApiTimeout) + ctx, cancelFn := context.WithTimeout(p.inputCtx, p.config.AwsAPITimeout) defer cancelFn() resp, err := req.Send(ctx) @@ -581,7 +581,7 @@ func (p *s3Input) deleteMessage(queueURL string, messagesReceiptHandle string, s // The Context will interrupt the request if the timeout expires. var cancelFn func() - ctx, cancelFn := context.WithTimeout(p.inputCtx, p.config.AwsApiTimeout) + ctx, cancelFn := context.WithTimeout(p.inputCtx, p.config.AwsAPITimeout) defer cancelFn() _, err := req.Send(ctx) From 8b7c544910b383d440d841b82f66367624ccf587 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Thu, 23 Jan 2020 20:58:43 -0700 Subject: [PATCH 16/20] Fix aws_api_timeout name in configs --- x-pack/filebeat/filebeat.reference.yml | 16 +-- x-pack/filebeat/input/s3/input.go | 97 ++++++++----------- x-pack/filebeat/input/s3/input_test.go | 32 +++++- x-pack/filebeat/module/aws/_meta/config.yml | 16 +-- .../module/aws/cloudtrail/manifest.yml | 2 + x-pack/filebeat/module/aws/elb/manifest.yml | 2 + .../filebeat/module/aws/s3access/manifest.yml | 2 + .../filebeat/module/aws/vpcflow/manifest.yml | 2 + x-pack/filebeat/modules.d/aws.yml.disabled | 16 +-- 9 files changed, 105 insertions(+), 80 deletions(-) diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index d4ffe976b56..2894c6a0cc0 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -115,9 +115,9 @@ filebeat.modules: # Default to be 300s #var.visibility_timeout: 300s - # Maximum duration before GetObject request will be interrupted by context + # Maximum duration before AWS API request will be interrupted by context # Default to be 120s - #var.context_timeout: 120s + #var.aws_api_timeout: 120s elb: enabled: false @@ -138,9 +138,9 @@ filebeat.modules: # Default to be 300s #var.visibility_timeout: 300s - # Maximum duration before GetObject request will be interrupted by context + # Maximum duration before AWS API request will be interrupted by context # Default to be 120s - #var.context_timeout: 120s + #var.aws_api_timeout: 120s vpcflow: enabled: false @@ -161,9 +161,9 @@ filebeat.modules: # Default to be 300s #var.visibility_timeout: 300s - # Maximum duration before GetObject request will be interrupted by context + # Maximum duration before AWS API request will be interrupted by context # Default to be 120s - #var.context_timeout: 120s + #var.aws_api_timeout: 120s cloudtrail: enabled: false @@ -184,9 +184,9 @@ filebeat.modules: # Default to be 300s #var.visibility_timeout: 300s - # Maximum duration before GetObject request will be interrupted by context + # Maximum duration before AWS API request will be interrupted by context # Default to be 120s - #var.context_timeout: 120s + #var.aws_api_timeout: 120s #-------------------------------- Azure Module -------------------------------- - module: azure diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index 2984e2e4e94..5fdd534eff7 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -390,12 +390,50 @@ func (p *s3Input) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, errC func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3Ctx *s3Context) error { objectHash := s3ObjectHash(info) - // read from s3 object - reader, err := p.newS3BucketReader(svc, info) + // Download the S3 object using GetObjectRequest. + s3GetObjectInput := &s3.GetObjectInput{ + Bucket: awssdk.String(info.name), + Key: awssdk.String(info.key), + } + req := svc.GetObjectRequest(s3GetObjectInput) + + // The Context will interrupt the request if the timeout expires. + var cancelFn func() + ctx, cancelFn := context.WithTimeout(p.inputCtx, p.config.AwsAPITimeout) + defer cancelFn() + + resp, err := req.Send(ctx) if err != nil { - err = errors.Wrap(err, "getS3ObjectResponse failed") - p.logger.Error(err) - return err + if awsErr, ok := err.(awserr.Error); ok { + // If the SDK can determine the request or retry delay was canceled + // by a context the ErrCodeRequestCanceled error will be returned. + if awsErr.Code() == awssdk.ErrCodeRequestCanceled { + err = errors.Wrap(err, "GetObject request canceled") + p.logger.Error(err) + return err + } + + if awsErr.Code() == "NoSuchKey" { + p.logger.Warn("Cannot find s3 file") + return nil + } + } + return errors.Wrap(err, "s3 get object request failed") + } + + defer resp.Body.Close() + + reader := bufio.NewReader(resp.Body) + // Check content-type + if (resp.ContentType != nil && *resp.ContentType == "application/x-gzip") || strings.HasSuffix(info.key, ".gz") { + gzipReader, err := gzip.NewReader(resp.Body) + if err != nil { + err = errors.Wrap(err, "gzip.NewReader failed") + p.logger.Error(err) + return err + } + reader = bufio.NewReader(gzipReader) + gzipReader.Close() } // Decode JSON documents when expand_event_list_from_field is given in config @@ -514,55 +552,6 @@ func (p *s3Input) convertJSONToEvent(jsonFields interface{}, offset int, objectH return nil } -func (p *s3Input) newS3BucketReader(svc s3iface.ClientAPI, info s3Info) (*bufio.Reader, error) { - // Download the S3 object using GetObjectRequest. - s3GetObjectInput := &s3.GetObjectInput{ - Bucket: awssdk.String(info.name), - Key: awssdk.String(info.key), - } - req := svc.GetObjectRequest(s3GetObjectInput) - - // The Context will interrupt the request if the timeout expires. - var cancelFn func() - ctx, cancelFn := context.WithTimeout(p.inputCtx, p.config.AwsAPITimeout) - defer cancelFn() - - resp, err := req.Send(ctx) - if err != nil { - if awsErr, ok := err.(awserr.Error); ok { - // If the SDK can determine the request or retry delay was canceled - // by a context the ErrCodeRequestCanceled error will be returned. - if awsErr.Code() == awssdk.ErrCodeRequestCanceled { - err = errors.Wrap(err, "GetObject request canceled") - p.logger.Error(err) - return nil, err - } - - if awsErr.Code() == "NoSuchKey" { - p.logger.Warn("Cannot find s3 file") - return nil, nil - } - } - return nil, errors.Wrap(err, "s3 get object request failed") - } - - defer resp.Body.Close() - - reader := bufio.NewReader(resp.Body) - // Check content-type - if (resp.ContentType != nil && *resp.ContentType == "application/x-gzip") || strings.HasSuffix(info.key, ".gz") { - gzipReader, err := gzip.NewReader(resp.Body) - if err != nil { - err = errors.Wrap(err, "gzip.NewReader failed") - p.logger.Error(err) - return nil, err - } - reader = bufio.NewReader(gzipReader) - gzipReader.Close() - } - return reader, nil -} - func (p *s3Input) forwardEvent(event beat.Event) error { ok := p.outlet.OnEvent(event) if !ok { diff --git a/x-pack/filebeat/input/s3/input_test.go b/x-pack/filebeat/input/s3/input_test.go index 4b37a78ab63..cc22afeb810 100644 --- a/x-pack/filebeat/input/s3/input_test.go +++ b/x-pack/filebeat/input/s3/input_test.go @@ -5,6 +5,7 @@ package s3 import ( + "bufio" "bytes" "context" "fmt" @@ -135,8 +136,22 @@ func TestHandleMessage(t *testing.T) { func TestNewS3BucketReader(t *testing.T) { p := &s3Input{inputCtx: context.Background()} - reader, err := p.newS3BucketReader(mockSvc, info) + s3GetObjectInput := &s3.GetObjectInput{ + Bucket: awssdk.String(info.name), + Key: awssdk.String(info.key), + } + req := mockSvc.GetObjectRequest(s3GetObjectInput) + + // The Context will interrupt the request if the timeout expires. + var cancelFn func() + ctx, cancelFn := context.WithTimeout(p.inputCtx, p.config.AwsAPITimeout) + defer cancelFn() + + resp, err := req.Send(ctx) assert.NoError(t, err) + reader := bufio.NewReader(resp.Body) + resp.Body.Close() + for i := 0; i < 3; i++ { switch i { case 0: @@ -172,9 +187,22 @@ func TestCreateEvent(t *testing.T) { } s3ObjectHash := s3ObjectHash(s3Info) - reader, err := p.newS3BucketReader(mockSvc, s3Info) + s3GetObjectInput := &s3.GetObjectInput{ + Bucket: awssdk.String(info.name), + Key: awssdk.String(info.key), + } + req := mockSvc.GetObjectRequest(s3GetObjectInput) + + // The Context will interrupt the request if the timeout expires. + var cancelFn func() + ctx, cancelFn := context.WithTimeout(p.inputCtx, p.config.AwsAPITimeout) + defer cancelFn() + resp, err := req.Send(ctx) assert.NoError(t, err) + reader := bufio.NewReader(resp.Body) + resp.Body.Close() + var events []beat.Event for { log, err := reader.ReadString('\n') diff --git a/x-pack/filebeat/module/aws/_meta/config.yml b/x-pack/filebeat/module/aws/_meta/config.yml index bbb25f39918..ac397e7e50c 100644 --- a/x-pack/filebeat/module/aws/_meta/config.yml +++ b/x-pack/filebeat/module/aws/_meta/config.yml @@ -18,9 +18,9 @@ # Default to be 300s #var.visibility_timeout: 300s - # Maximum duration before GetObject request will be interrupted by context + # Maximum duration before AWS API request will be interrupted by context # Default to be 120s - #var.context_timeout: 120s + #var.aws_api_timeout: 120s elb: enabled: false @@ -41,9 +41,9 @@ # Default to be 300s #var.visibility_timeout: 300s - # Maximum duration before GetObject request will be interrupted by context + # Maximum duration before AWS API request will be interrupted by context # Default to be 120s - #var.context_timeout: 120s + #var.aws_api_timeout: 120s vpcflow: enabled: false @@ -64,9 +64,9 @@ # Default to be 300s #var.visibility_timeout: 300s - # Maximum duration before GetObject request will be interrupted by context + # Maximum duration before AWS API request will be interrupted by context # Default to be 120s - #var.context_timeout: 120s + #var.aws_api_timeout: 120s cloudtrail: enabled: false @@ -87,6 +87,6 @@ # Default to be 300s #var.visibility_timeout: 300s - # Maximum duration before GetObject request will be interrupted by context + # Maximum duration before AWS API request will be interrupted by context # Default to be 120s - #var.context_timeout: 120s + #var.aws_api_timeout: 120s diff --git a/x-pack/filebeat/module/aws/cloudtrail/manifest.yml b/x-pack/filebeat/module/aws/cloudtrail/manifest.yml index 4865624045e..9228bc6c25d 100644 --- a/x-pack/filebeat/module/aws/cloudtrail/manifest.yml +++ b/x-pack/filebeat/module/aws/cloudtrail/manifest.yml @@ -5,6 +5,8 @@ var: default: s3 - name: shared_credential_file - name: credential_profile_name + - name: visibility_timeout + - name: aws_api_timeout ingest_pipeline: ingest/pipeline.yml input: config/cloudtrail.yml diff --git a/x-pack/filebeat/module/aws/elb/manifest.yml b/x-pack/filebeat/module/aws/elb/manifest.yml index ca83ac2a315..43a948ec28a 100644 --- a/x-pack/filebeat/module/aws/elb/manifest.yml +++ b/x-pack/filebeat/module/aws/elb/manifest.yml @@ -5,6 +5,8 @@ var: default: s3 - name: shared_credential_file - name: credential_profile_name + - name: visibility_timeout + - name: aws_api_timeout ingest_pipeline: ingest/pipeline.yml input: config/{{.input}}.yml diff --git a/x-pack/filebeat/module/aws/s3access/manifest.yml b/x-pack/filebeat/module/aws/s3access/manifest.yml index 20c0ce4efc7..18f1da8de35 100644 --- a/x-pack/filebeat/module/aws/s3access/manifest.yml +++ b/x-pack/filebeat/module/aws/s3access/manifest.yml @@ -5,6 +5,8 @@ var: default: s3 - name: shared_credential_file - name: credential_profile_name + - name: visibility_timeout + - name: aws_api_timeout ingest_pipeline: ingest/pipeline.yml input: config/{{.input}}.yml diff --git a/x-pack/filebeat/module/aws/vpcflow/manifest.yml b/x-pack/filebeat/module/aws/vpcflow/manifest.yml index 9e047a606eb..d046ba82201 100644 --- a/x-pack/filebeat/module/aws/vpcflow/manifest.yml +++ b/x-pack/filebeat/module/aws/vpcflow/manifest.yml @@ -5,6 +5,8 @@ var: default: s3 - name: shared_credential_file - name: credential_profile_name + - name: visibility_timeout + - name: aws_api_timeout ingest_pipeline: ingest/pipeline.yml input: config/input.yml diff --git a/x-pack/filebeat/modules.d/aws.yml.disabled b/x-pack/filebeat/modules.d/aws.yml.disabled index 40fa9ba0917..6370850294b 100644 --- a/x-pack/filebeat/modules.d/aws.yml.disabled +++ b/x-pack/filebeat/modules.d/aws.yml.disabled @@ -21,9 +21,9 @@ # Default to be 300s #var.visibility_timeout: 300s - # Maximum duration before GetObject request will be interrupted by context + # Maximum duration before AWS API request will be interrupted by context # Default to be 120s - #var.context_timeout: 120s + #var.aws_api_timeout: 120s elb: enabled: false @@ -44,9 +44,9 @@ # Default to be 300s #var.visibility_timeout: 300s - # Maximum duration before GetObject request will be interrupted by context + # Maximum duration before AWS API request will be interrupted by context # Default to be 120s - #var.context_timeout: 120s + #var.aws_api_timeout: 120s vpcflow: enabled: false @@ -67,9 +67,9 @@ # Default to be 300s #var.visibility_timeout: 300s - # Maximum duration before GetObject request will be interrupted by context + # Maximum duration before AWS API request will be interrupted by context # Default to be 120s - #var.context_timeout: 120s + #var.aws_api_timeout: 120s cloudtrail: enabled: false @@ -90,6 +90,6 @@ # Default to be 300s #var.visibility_timeout: 300s - # Maximum duration before GetObject request will be interrupted by context + # Maximum duration before AWS API request will be interrupted by context # Default to be 120s - #var.context_timeout: 120s + #var.aws_api_timeout: 120s From 2b85538a9d55333e0d3c2a2ee51d4236b1473f86 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Fri, 24 Jan 2020 08:13:43 -0700 Subject: [PATCH 17/20] Change back to use channelContext --- .../docs/inputs/input-aws-s3.asciidoc | 2 +- x-pack/filebeat/input/s3/input.go | 42 +++++++++++-------- x-pack/filebeat/input/s3/input_test.go | 10 ++--- x-pack/filebeat/module/aws/_meta/config.yml | 8 ++-- 4 files changed, 34 insertions(+), 28 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index c2d08fd3850..774e15e48bf 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -47,7 +47,7 @@ URL of the AWS SQS queue that messages will be received from. Required. The duration that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request. -This value needs to be a lot bigger than Filebeat collection frequency so +This value needs to be a lot bigger than {beatname_uc} collection frequency so if it took too long to read the s3 log, this sqs message will not be reprocessed. The default visibility timeout for a message is 300 seconds. The minimum is 0 seconds. The maximum is 12 hours. diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index 5fdd534eff7..583196e16f0 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -65,7 +65,7 @@ type s3Input struct { logger *logp.Logger close chan struct{} workerOnce sync.Once // Guarantees that the worker goroutine is only started once. - inputCtx context.Context // Wraps the Done channel from parent input.Context. + context *channelContext workerWg sync.WaitGroup // Waits on s3 worker goroutine. stopOnce sync.Once } @@ -107,6 +107,23 @@ type s3Context struct { errC chan error } +// channelContext implements context.Context by wrapping a channel +type channelContext struct { + done <-chan struct{} +} + +func (c *channelContext) Deadline() (time.Time, bool) { return time.Time{}, false } +func (c *channelContext) Done() <-chan struct{} { return c.done } +func (c *channelContext) Err() error { + select { + case <-c.done: + return context.Canceled + default: + return nil + } +} +func (c *channelContext) Value(key interface{}) interface{} { return nil } + // NewInput creates a new s3 input func NewInput(cfg *common.Config, connector channel.Connector, inputContext input.Context) (input.Input, error) { cfgwarn.Beta("s3 input type is used") @@ -140,24 +157,13 @@ func NewInput(cfg *common.Config, connector channel.Connector, inputContext inpu closeChannel := make(chan struct{}) - // Wrap input.Context's Done channel with a context.Context. This goroutine - // stops with the parent closes the Done channel. - inputCtx, cancelInputCtx := context.WithCancel(context.Background()) - go func() { - defer cancelInputCtx() - select { - case <-inputContext.Done: - case <-inputCtx.Done(): - } - }() - p := &s3Input{ outlet: out, config: config, awsConfig: awsConfig, logger: logger, close: closeChannel, - inputCtx: inputCtx, + context: &channelContext{closeChannel}, } return p, nil } @@ -189,7 +195,7 @@ func (p *s3Input) run(svcSQS sqsiface.ClientAPI, svcS3 s3iface.ClientAPI, visibi defer p.logger.Infof("s3 input worker for '%v' has stopped.", p.config.QueueURL) p.logger.Infof("s3 input worker has started. with queueURL: %v", p.config.QueueURL) - for p.inputCtx.Err() == nil { + for p.context.Err() == nil { // receive messages from sqs output, err := p.receiveMessage(svcSQS, visibilityTimeout) if err != nil { @@ -313,7 +319,7 @@ func (p *s3Input) receiveMessage(svcSQS sqsiface.ClientAPI, visibilityTimeout in // The Context will interrupt the request if the timeout expires. var cancelFn func() - ctx, cancelFn := context.WithTimeout(p.inputCtx, p.config.AwsAPITimeout) + ctx, cancelFn := context.WithTimeout(p.context, p.config.AwsAPITimeout) defer cancelFn() return req.Send(ctx) @@ -328,7 +334,7 @@ func (p *s3Input) changeVisibilityTimeout(queueURL string, visibilityTimeout int // The Context will interrupt the request if the timeout expires. var cancelFn func() - ctx, cancelFn := context.WithTimeout(p.inputCtx, p.config.AwsAPITimeout) + ctx, cancelFn := context.WithTimeout(p.context, p.config.AwsAPITimeout) defer cancelFn() _, err := req.Send(ctx) @@ -399,7 +405,7 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C // The Context will interrupt the request if the timeout expires. var cancelFn func() - ctx, cancelFn := context.WithTimeout(p.inputCtx, p.config.AwsAPITimeout) + ctx, cancelFn := context.WithTimeout(p.context, p.config.AwsAPITimeout) defer cancelFn() resp, err := req.Send(ctx) @@ -570,7 +576,7 @@ func (p *s3Input) deleteMessage(queueURL string, messagesReceiptHandle string, s // The Context will interrupt the request if the timeout expires. var cancelFn func() - ctx, cancelFn := context.WithTimeout(p.inputCtx, p.config.AwsAPITimeout) + ctx, cancelFn := context.WithTimeout(p.context, p.config.AwsAPITimeout) defer cancelFn() _, err := req.Send(ctx) diff --git a/x-pack/filebeat/input/s3/input_test.go b/x-pack/filebeat/input/s3/input_test.go index cc22afeb810..9c77a415c81 100644 --- a/x-pack/filebeat/input/s3/input_test.go +++ b/x-pack/filebeat/input/s3/input_test.go @@ -135,7 +135,7 @@ func TestHandleMessage(t *testing.T) { } func TestNewS3BucketReader(t *testing.T) { - p := &s3Input{inputCtx: context.Background()} + p := &s3Input{context: &channelContext{}} s3GetObjectInput := &s3.GetObjectInput{ Bucket: awssdk.String(info.name), Key: awssdk.String(info.key), @@ -144,13 +144,13 @@ func TestNewS3BucketReader(t *testing.T) { // The Context will interrupt the request if the timeout expires. var cancelFn func() - ctx, cancelFn := context.WithTimeout(p.inputCtx, p.config.AwsAPITimeout) + ctx, cancelFn := context.WithTimeout(p.context, p.config.AwsAPITimeout) defer cancelFn() resp, err := req.Send(ctx) assert.NoError(t, err) reader := bufio.NewReader(resp.Body) - resp.Body.Close() + defer resp.Body.Close() for i := 0; i < 3; i++ { switch i { @@ -171,7 +171,7 @@ func TestNewS3BucketReader(t *testing.T) { } func TestCreateEvent(t *testing.T) { - p := &s3Input{inputCtx: context.Background()} + p := &s3Input{context: &channelContext{}} errC := make(chan error) s3Context := &s3Context{ refs: 1, @@ -195,7 +195,7 @@ func TestCreateEvent(t *testing.T) { // The Context will interrupt the request if the timeout expires. var cancelFn func() - ctx, cancelFn := context.WithTimeout(p.inputCtx, p.config.AwsAPITimeout) + ctx, cancelFn := context.WithTimeout(p.context, p.config.AwsAPITimeout) defer cancelFn() resp, err := req.Send(ctx) diff --git a/x-pack/filebeat/module/aws/_meta/config.yml b/x-pack/filebeat/module/aws/_meta/config.yml index ac397e7e50c..314182bc241 100644 --- a/x-pack/filebeat/module/aws/_meta/config.yml +++ b/x-pack/filebeat/module/aws/_meta/config.yml @@ -18,7 +18,7 @@ # Default to be 300s #var.visibility_timeout: 300s - # Maximum duration before AWS API request will be interrupted by context + # Maximum duration before AWS API request will be interrupted # Default to be 120s #var.aws_api_timeout: 120s @@ -41,7 +41,7 @@ # Default to be 300s #var.visibility_timeout: 300s - # Maximum duration before AWS API request will be interrupted by context + # Maximum duration before AWS API request will be interrupted # Default to be 120s #var.aws_api_timeout: 120s @@ -64,7 +64,7 @@ # Default to be 300s #var.visibility_timeout: 300s - # Maximum duration before AWS API request will be interrupted by context + # Maximum duration before AWS API request will be interrupted # Default to be 120s #var.aws_api_timeout: 120s @@ -87,6 +87,6 @@ # Default to be 300s #var.visibility_timeout: 300s - # Maximum duration before AWS API request will be interrupted by context + # Maximum duration before AWS API request will be interrupted # Default to be 120s #var.aws_api_timeout: 120s From cbb92e74461d5ec7046257b45c1b9a1763f29361 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Fri, 24 Jan 2020 09:29:57 -0700 Subject: [PATCH 18/20] Remove predefine cancelFn --- x-pack/filebeat/filebeat.reference.yml | 8 ++++---- x-pack/filebeat/input/s3/input.go | 13 ++++--------- x-pack/filebeat/modules.d/aws.yml.disabled | 8 ++++---- 3 files changed, 12 insertions(+), 17 deletions(-) diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 2894c6a0cc0..a39e658a6f0 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -115,7 +115,7 @@ filebeat.modules: # Default to be 300s #var.visibility_timeout: 300s - # Maximum duration before AWS API request will be interrupted by context + # Maximum duration before AWS API request will be interrupted # Default to be 120s #var.aws_api_timeout: 120s @@ -138,7 +138,7 @@ filebeat.modules: # Default to be 300s #var.visibility_timeout: 300s - # Maximum duration before AWS API request will be interrupted by context + # Maximum duration before AWS API request will be interrupted # Default to be 120s #var.aws_api_timeout: 120s @@ -161,7 +161,7 @@ filebeat.modules: # Default to be 300s #var.visibility_timeout: 300s - # Maximum duration before AWS API request will be interrupted by context + # Maximum duration before AWS API request will be interrupted # Default to be 120s #var.aws_api_timeout: 120s @@ -184,7 +184,7 @@ filebeat.modules: # Default to be 300s #var.visibility_timeout: 300s - # Maximum duration before AWS API request will be interrupted by context + # Maximum duration before AWS API request will be interrupted # Default to be 120s #var.aws_api_timeout: 120s diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index 583196e16f0..12d53556094 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -64,9 +64,9 @@ type s3Input struct { awsConfig awssdk.Config logger *logp.Logger close chan struct{} - workerOnce sync.Once // Guarantees that the worker goroutine is only started once. + workerOnce sync.Once // Guarantees that the worker goroutine is only started once. context *channelContext - workerWg sync.WaitGroup // Waits on s3 worker goroutine. + workerWg sync.WaitGroup // Waits on s3 worker goroutine. stopOnce sync.Once } @@ -125,7 +125,7 @@ func (c *channelContext) Err() error { func (c *channelContext) Value(key interface{}) interface{} { return nil } // NewInput creates a new s3 input -func NewInput(cfg *common.Config, connector channel.Connector, inputContext input.Context) (input.Input, error) { +func NewInput(cfg *common.Config, connector channel.Connector, context input.Context) (input.Input, error) { cfgwarn.Beta("s3 input type is used") logger := logp.NewLogger(inputName) @@ -136,7 +136,7 @@ func NewInput(cfg *common.Config, connector channel.Connector, inputContext inpu out, err := connector.ConnectWith(cfg, beat.ClientConfig{ Processing: beat.ProcessingConfig{ - DynamicFields: inputContext.DynamicFields, + DynamicFields: context.DynamicFields, }, ACKEvents: func(privates []interface{}) { for _, private := range privates { @@ -156,7 +156,6 @@ func NewInput(cfg *common.Config, connector channel.Connector, inputContext inpu } closeChannel := make(chan struct{}) - p := &s3Input{ outlet: out, config: config, @@ -318,7 +317,6 @@ func (p *s3Input) receiveMessage(svcSQS sqsiface.ClientAPI, visibilityTimeout in }) // The Context will interrupt the request if the timeout expires. - var cancelFn func() ctx, cancelFn := context.WithTimeout(p.context, p.config.AwsAPITimeout) defer cancelFn() @@ -333,7 +331,6 @@ func (p *s3Input) changeVisibilityTimeout(queueURL string, visibilityTimeout int }) // The Context will interrupt the request if the timeout expires. - var cancelFn func() ctx, cancelFn := context.WithTimeout(p.context, p.config.AwsAPITimeout) defer cancelFn() @@ -404,7 +401,6 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C req := svc.GetObjectRequest(s3GetObjectInput) // The Context will interrupt the request if the timeout expires. - var cancelFn func() ctx, cancelFn := context.WithTimeout(p.context, p.config.AwsAPITimeout) defer cancelFn() @@ -575,7 +571,6 @@ func (p *s3Input) deleteMessage(queueURL string, messagesReceiptHandle string, s req := svcSQS.DeleteMessageRequest(deleteMessageInput) // The Context will interrupt the request if the timeout expires. - var cancelFn func() ctx, cancelFn := context.WithTimeout(p.context, p.config.AwsAPITimeout) defer cancelFn() diff --git a/x-pack/filebeat/modules.d/aws.yml.disabled b/x-pack/filebeat/modules.d/aws.yml.disabled index 6370850294b..618e70940f6 100644 --- a/x-pack/filebeat/modules.d/aws.yml.disabled +++ b/x-pack/filebeat/modules.d/aws.yml.disabled @@ -21,7 +21,7 @@ # Default to be 300s #var.visibility_timeout: 300s - # Maximum duration before AWS API request will be interrupted by context + # Maximum duration before AWS API request will be interrupted # Default to be 120s #var.aws_api_timeout: 120s @@ -44,7 +44,7 @@ # Default to be 300s #var.visibility_timeout: 300s - # Maximum duration before AWS API request will be interrupted by context + # Maximum duration before AWS API request will be interrupted # Default to be 120s #var.aws_api_timeout: 120s @@ -67,7 +67,7 @@ # Default to be 300s #var.visibility_timeout: 300s - # Maximum duration before AWS API request will be interrupted by context + # Maximum duration before AWS API request will be interrupted # Default to be 120s #var.aws_api_timeout: 120s @@ -90,6 +90,6 @@ # Default to be 300s #var.visibility_timeout: 300s - # Maximum duration before AWS API request will be interrupted by context + # Maximum duration before AWS API request will be interrupted # Default to be 120s #var.aws_api_timeout: 120s From f4f8cbd25b49d979c3c60dc0832103dd936c64ac Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Fri, 24 Jan 2020 10:33:37 -0700 Subject: [PATCH 19/20] improve info message --- x-pack/filebeat/input/s3/input.go | 4 ++-- x-pack/filebeat/input/s3/input_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index 12d53556094..f8280afda3c 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -171,8 +171,8 @@ func NewInput(cfg *common.Config, connector channel.Connector, context input.Con func (p *s3Input) Run() { p.workerOnce.Do(func() { visibilityTimeout := int64(p.config.VisibilityTimeout.Seconds()) - p.logger.Infof("visibility timeout is set to %v seconds: ", visibilityTimeout) - p.logger.Infof("aws api timeout is set to %v: ", p.config.AwsAPITimeout) + p.logger.Infof("visibility timeout is set to %v seconds", visibilityTimeout) + p.logger.Infof("aws api timeout is set to %v", p.config.AwsAPITimeout) regionName, err := getRegionFromQueueURL(p.config.QueueURL) if err != nil { diff --git a/x-pack/filebeat/input/s3/input_test.go b/x-pack/filebeat/input/s3/input_test.go index 9c77a415c81..88b3aa1340e 100644 --- a/x-pack/filebeat/input/s3/input_test.go +++ b/x-pack/filebeat/input/s3/input_test.go @@ -201,7 +201,7 @@ func TestCreateEvent(t *testing.T) { resp, err := req.Send(ctx) assert.NoError(t, err) reader := bufio.NewReader(resp.Body) - resp.Body.Close() + defer resp.Body.Close() var events []beat.Event for { From 1cafa792f2da3c65bb4440496d4f2a213ff29069 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Fri, 24 Jan 2020 15:10:08 -0700 Subject: [PATCH 20/20] Change aws_api_timeout to api_timeout --- x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc | 2 +- x-pack/filebeat/filebeat.reference.yml | 8 ++++---- x-pack/filebeat/input/s3/config.go | 10 +++++----- x-pack/filebeat/input/s3/input.go | 10 +++++----- x-pack/filebeat/input/s3/input_test.go | 4 ++-- x-pack/filebeat/module/aws/_meta/config.yml | 8 ++++---- .../module/aws/cloudtrail/config/cloudtrail.yml | 4 ++-- x-pack/filebeat/module/aws/cloudtrail/manifest.yml | 2 +- x-pack/filebeat/module/aws/elb/config/s3.yml | 4 ++-- x-pack/filebeat/module/aws/elb/manifest.yml | 2 +- x-pack/filebeat/module/aws/s3access/config/s3.yml | 4 ++-- x-pack/filebeat/module/aws/s3access/manifest.yml | 2 +- x-pack/filebeat/module/aws/vpcflow/config/input.yml | 4 ++-- x-pack/filebeat/module/aws/vpcflow/manifest.yml | 2 +- x-pack/filebeat/modules.d/aws.yml.disabled | 8 ++++---- 15 files changed, 37 insertions(+), 37 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 774e15e48bf..6715b854747 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -62,7 +62,7 @@ messages under the group value into separate events. For example, CloudTrail log are in JSON format and events are found under the JSON object "Records": [float] -==== `aws_api_timeout` +==== `api_timeout` The maximum duration of AWS API can take. If it exceeds the timeout, AWS API will be interrupted. diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index a39e658a6f0..2a31dd42877 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -117,7 +117,7 @@ filebeat.modules: # Maximum duration before AWS API request will be interrupted # Default to be 120s - #var.aws_api_timeout: 120s + #var.api_timeout: 120s elb: enabled: false @@ -140,7 +140,7 @@ filebeat.modules: # Maximum duration before AWS API request will be interrupted # Default to be 120s - #var.aws_api_timeout: 120s + #var.api_timeout: 120s vpcflow: enabled: false @@ -163,7 +163,7 @@ filebeat.modules: # Maximum duration before AWS API request will be interrupted # Default to be 120s - #var.aws_api_timeout: 120s + #var.api_timeout: 120s cloudtrail: enabled: false @@ -186,7 +186,7 @@ filebeat.modules: # Maximum duration before AWS API request will be interrupted # Default to be 120s - #var.aws_api_timeout: 120s + #var.api_timeout: 120s #-------------------------------- Azure Module -------------------------------- - module: azure diff --git a/x-pack/filebeat/input/s3/config.go b/x-pack/filebeat/input/s3/config.go index dab77ceaecb..7fa455713e9 100644 --- a/x-pack/filebeat/input/s3/config.go +++ b/x-pack/filebeat/input/s3/config.go @@ -18,7 +18,7 @@ type config struct { VisibilityTimeout time.Duration `config:"visibility_timeout"` AwsConfig awscommon.ConfigAWS `config:",inline"` ExpandEventListFromField string `config:"expand_event_list_from_field"` - AwsAPITimeout time.Duration `config:"aws_api_timeout"` + APITimeout time.Duration `config:"api_timeout"` } func defaultConfig() config { @@ -27,7 +27,7 @@ func defaultConfig() config { Type: "s3", }, VisibilityTimeout: 300 * time.Second, - AwsAPITimeout: 120 * time.Second, + APITimeout: 120 * time.Second, } } @@ -36,9 +36,9 @@ func (c *config) Validate() error { return fmt.Errorf("visibility timeout %v is not within the "+ "required range 0s to 12h", c.VisibilityTimeout) } - if c.AwsAPITimeout < 0 || c.AwsAPITimeout > c.VisibilityTimeout/2 { - return fmt.Errorf("aws api timeout %v needs to be larger than"+ - " 0s and smaller than half of the visibility timeout", c.AwsAPITimeout) + if c.APITimeout < 0 || c.APITimeout > c.VisibilityTimeout/2 { + return fmt.Errorf("api timeout %v needs to be larger than"+ + " 0s and smaller than half of the visibility timeout", c.APITimeout) } return nil } diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index f8280afda3c..dd254cffebf 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -172,7 +172,7 @@ func (p *s3Input) Run() { p.workerOnce.Do(func() { visibilityTimeout := int64(p.config.VisibilityTimeout.Seconds()) p.logger.Infof("visibility timeout is set to %v seconds", visibilityTimeout) - p.logger.Infof("aws api timeout is set to %v", p.config.AwsAPITimeout) + p.logger.Infof("aws api timeout is set to %v", p.config.APITimeout) regionName, err := getRegionFromQueueURL(p.config.QueueURL) if err != nil { @@ -317,7 +317,7 @@ func (p *s3Input) receiveMessage(svcSQS sqsiface.ClientAPI, visibilityTimeout in }) // The Context will interrupt the request if the timeout expires. - ctx, cancelFn := context.WithTimeout(p.context, p.config.AwsAPITimeout) + ctx, cancelFn := context.WithTimeout(p.context, p.config.APITimeout) defer cancelFn() return req.Send(ctx) @@ -331,7 +331,7 @@ func (p *s3Input) changeVisibilityTimeout(queueURL string, visibilityTimeout int }) // The Context will interrupt the request if the timeout expires. - ctx, cancelFn := context.WithTimeout(p.context, p.config.AwsAPITimeout) + ctx, cancelFn := context.WithTimeout(p.context, p.config.APITimeout) defer cancelFn() _, err := req.Send(ctx) @@ -401,7 +401,7 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C req := svc.GetObjectRequest(s3GetObjectInput) // The Context will interrupt the request if the timeout expires. - ctx, cancelFn := context.WithTimeout(p.context, p.config.AwsAPITimeout) + ctx, cancelFn := context.WithTimeout(p.context, p.config.APITimeout) defer cancelFn() resp, err := req.Send(ctx) @@ -571,7 +571,7 @@ func (p *s3Input) deleteMessage(queueURL string, messagesReceiptHandle string, s req := svcSQS.DeleteMessageRequest(deleteMessageInput) // The Context will interrupt the request if the timeout expires. - ctx, cancelFn := context.WithTimeout(p.context, p.config.AwsAPITimeout) + ctx, cancelFn := context.WithTimeout(p.context, p.config.APITimeout) defer cancelFn() _, err := req.Send(ctx) diff --git a/x-pack/filebeat/input/s3/input_test.go b/x-pack/filebeat/input/s3/input_test.go index 88b3aa1340e..62d93a66e5f 100644 --- a/x-pack/filebeat/input/s3/input_test.go +++ b/x-pack/filebeat/input/s3/input_test.go @@ -144,7 +144,7 @@ func TestNewS3BucketReader(t *testing.T) { // The Context will interrupt the request if the timeout expires. var cancelFn func() - ctx, cancelFn := context.WithTimeout(p.context, p.config.AwsAPITimeout) + ctx, cancelFn := context.WithTimeout(p.context, p.config.APITimeout) defer cancelFn() resp, err := req.Send(ctx) @@ -195,7 +195,7 @@ func TestCreateEvent(t *testing.T) { // The Context will interrupt the request if the timeout expires. var cancelFn func() - ctx, cancelFn := context.WithTimeout(p.context, p.config.AwsAPITimeout) + ctx, cancelFn := context.WithTimeout(p.context, p.config.APITimeout) defer cancelFn() resp, err := req.Send(ctx) diff --git a/x-pack/filebeat/module/aws/_meta/config.yml b/x-pack/filebeat/module/aws/_meta/config.yml index 314182bc241..e87956f950c 100644 --- a/x-pack/filebeat/module/aws/_meta/config.yml +++ b/x-pack/filebeat/module/aws/_meta/config.yml @@ -20,7 +20,7 @@ # Maximum duration before AWS API request will be interrupted # Default to be 120s - #var.aws_api_timeout: 120s + #var.api_timeout: 120s elb: enabled: false @@ -43,7 +43,7 @@ # Maximum duration before AWS API request will be interrupted # Default to be 120s - #var.aws_api_timeout: 120s + #var.api_timeout: 120s vpcflow: enabled: false @@ -66,7 +66,7 @@ # Maximum duration before AWS API request will be interrupted # Default to be 120s - #var.aws_api_timeout: 120s + #var.api_timeout: 120s cloudtrail: enabled: false @@ -89,4 +89,4 @@ # Maximum duration before AWS API request will be interrupted # Default to be 120s - #var.aws_api_timeout: 120s + #var.api_timeout: 120s diff --git a/x-pack/filebeat/module/aws/cloudtrail/config/cloudtrail.yml b/x-pack/filebeat/module/aws/cloudtrail/config/cloudtrail.yml index 766b93d0427..87219497aef 100644 --- a/x-pack/filebeat/module/aws/cloudtrail/config/cloudtrail.yml +++ b/x-pack/filebeat/module/aws/cloudtrail/config/cloudtrail.yml @@ -16,8 +16,8 @@ shared_credential_file: {{ .shared_credential_file }} visibility_timeout: {{ .visibility_timeout }} {{ end }} -{{ if .aws_api_timeout }} -aws_api_timeout: {{ .aws_api_timeout }} +{{ if .api_timeout }} +api_timeout: {{ .api_timeout }} {{ end }} {{ else if eq .input "file" }} diff --git a/x-pack/filebeat/module/aws/cloudtrail/manifest.yml b/x-pack/filebeat/module/aws/cloudtrail/manifest.yml index 9228bc6c25d..d809dec5fb5 100644 --- a/x-pack/filebeat/module/aws/cloudtrail/manifest.yml +++ b/x-pack/filebeat/module/aws/cloudtrail/manifest.yml @@ -6,7 +6,7 @@ var: - name: shared_credential_file - name: credential_profile_name - name: visibility_timeout - - name: aws_api_timeout + - name: api_timeout ingest_pipeline: ingest/pipeline.yml input: config/cloudtrail.yml diff --git a/x-pack/filebeat/module/aws/elb/config/s3.yml b/x-pack/filebeat/module/aws/elb/config/s3.yml index f9672ad185b..40212723e96 100644 --- a/x-pack/filebeat/module/aws/elb/config/s3.yml +++ b/x-pack/filebeat/module/aws/elb/config/s3.yml @@ -13,6 +13,6 @@ shared_credential_file: {{ .shared_credential_file }} visibility_timeout: {{ .visibility_timeout }} {{ end }} -{{ if .aws_api_timeout }} -aws_api_timeout: {{ .aws_api_timeout }} +{{ if .api_timeout }} +api_timeout: {{ .api_timeout }} {{ end }} diff --git a/x-pack/filebeat/module/aws/elb/manifest.yml b/x-pack/filebeat/module/aws/elb/manifest.yml index 43a948ec28a..01857d90643 100644 --- a/x-pack/filebeat/module/aws/elb/manifest.yml +++ b/x-pack/filebeat/module/aws/elb/manifest.yml @@ -6,7 +6,7 @@ var: - name: shared_credential_file - name: credential_profile_name - name: visibility_timeout - - name: aws_api_timeout + - name: api_timeout ingest_pipeline: ingest/pipeline.yml input: config/{{.input}}.yml diff --git a/x-pack/filebeat/module/aws/s3access/config/s3.yml b/x-pack/filebeat/module/aws/s3access/config/s3.yml index f9672ad185b..40212723e96 100644 --- a/x-pack/filebeat/module/aws/s3access/config/s3.yml +++ b/x-pack/filebeat/module/aws/s3access/config/s3.yml @@ -13,6 +13,6 @@ shared_credential_file: {{ .shared_credential_file }} visibility_timeout: {{ .visibility_timeout }} {{ end }} -{{ if .aws_api_timeout }} -aws_api_timeout: {{ .aws_api_timeout }} +{{ if .api_timeout }} +api_timeout: {{ .api_timeout }} {{ end }} diff --git a/x-pack/filebeat/module/aws/s3access/manifest.yml b/x-pack/filebeat/module/aws/s3access/manifest.yml index 18f1da8de35..b4fdbb7ca49 100644 --- a/x-pack/filebeat/module/aws/s3access/manifest.yml +++ b/x-pack/filebeat/module/aws/s3access/manifest.yml @@ -6,7 +6,7 @@ var: - name: shared_credential_file - name: credential_profile_name - name: visibility_timeout - - name: aws_api_timeout + - name: api_timeout ingest_pipeline: ingest/pipeline.yml input: config/{{.input}}.yml diff --git a/x-pack/filebeat/module/aws/vpcflow/config/input.yml b/x-pack/filebeat/module/aws/vpcflow/config/input.yml index 6264db44081..1838d695eb4 100644 --- a/x-pack/filebeat/module/aws/vpcflow/config/input.yml +++ b/x-pack/filebeat/module/aws/vpcflow/config/input.yml @@ -15,8 +15,8 @@ shared_credential_file: {{ .shared_credential_file }} visibility_timeout: {{ .visibility_timeout }} {{ end }} -{{ if .aws_api_timeout }} -aws_api_timeout: {{ .aws_api_timeout }} +{{ if .api_timeout }} +api_timeout: {{ .api_timeout }} {{ end }} {{ else if eq .input "file" }} diff --git a/x-pack/filebeat/module/aws/vpcflow/manifest.yml b/x-pack/filebeat/module/aws/vpcflow/manifest.yml index d046ba82201..e438f1e91c3 100644 --- a/x-pack/filebeat/module/aws/vpcflow/manifest.yml +++ b/x-pack/filebeat/module/aws/vpcflow/manifest.yml @@ -6,7 +6,7 @@ var: - name: shared_credential_file - name: credential_profile_name - name: visibility_timeout - - name: aws_api_timeout + - name: api_timeout ingest_pipeline: ingest/pipeline.yml input: config/input.yml diff --git a/x-pack/filebeat/modules.d/aws.yml.disabled b/x-pack/filebeat/modules.d/aws.yml.disabled index 618e70940f6..689f697e023 100644 --- a/x-pack/filebeat/modules.d/aws.yml.disabled +++ b/x-pack/filebeat/modules.d/aws.yml.disabled @@ -23,7 +23,7 @@ # Maximum duration before AWS API request will be interrupted # Default to be 120s - #var.aws_api_timeout: 120s + #var.api_timeout: 120s elb: enabled: false @@ -46,7 +46,7 @@ # Maximum duration before AWS API request will be interrupted # Default to be 120s - #var.aws_api_timeout: 120s + #var.api_timeout: 120s vpcflow: enabled: false @@ -69,7 +69,7 @@ # Maximum duration before AWS API request will be interrupted # Default to be 120s - #var.aws_api_timeout: 120s + #var.api_timeout: 120s cloudtrail: enabled: false @@ -92,4 +92,4 @@ # Maximum duration before AWS API request will be interrupted # Default to be 120s - #var.aws_api_timeout: 120s + #var.api_timeout: 120s