From 2974ffc15c3efb63bfb9ad2abd627a3e43a74015 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 3 Jun 2020 10:26:04 -0700 Subject: [PATCH] S3 input: try to detect GZIPped objects (#18764) (#18937) * Try to detect GZIP based on content encoding header * Check GZIP contents * Log error before returning it * Fixing typo * Add comment * Adding comment * Adding CHANGELOG entry * Add test case for empty contents --- CHANGELOG.next.asciidoc | 1 + x-pack/filebeat/input/s3/input.go | 52 +++++++++++++++++++------- x-pack/filebeat/input/s3/input_test.go | 46 +++++++++++++++++++++++ 3 files changed, 86 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 97db7e44f6d0..f8ab731739ef 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -429,6 +429,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - The `logstash` module can now automatically detect the log file format (JSON or plaintext) and process it accordingly. {issue}9964[9964] {pull}18095[18095] - Improve ECS categorization field mappings in coredns module. {issue}16159[16159] {pull}18424[18424] - Improve ECS categorization field mappings in envoyproxy module. {issue}16161[16161] {pull}18395[18395] +- The s3 input can now automatically detect gzipped objects. {issue}18283[18283] {pull}18764[18764] *Heartbeat* diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index 980bc8961708..e3a0972f5b66 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -13,6 +13,7 @@ import ( "encoding/json" "fmt" "io" + "net/http" "net/url" "strings" "sync" @@ -436,7 +437,25 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C reader := bufio.NewReader(resp.Body) - // Check if expand_event_list_from_field is given with document conent-type = "application/json" + isS3ObjGzipped, err := isStreamGzipped(reader) + if err != nil { + err = errors.Wrap(err, "could not determine if S3 object is gzipped") + p.logger.Error(err) + return err + } + + if isS3ObjGzipped { + gzipReader, err := gzip.NewReader(reader) + if err != nil { + err = errors.Wrapf(err, "gzip.NewReader failed for '%s' from S3 bucket '%s'", info.key, info.name) + p.logger.Error(err) + return err + } + reader = bufio.NewReader(gzipReader) + gzipReader.Close() + } + + // Check if expand_event_list_from_field is given with document content-type = "application/json" if resp.ContentType != nil && *resp.ContentType == "application/json" && p.config.ExpandEventListFromField == "" { err := errors.New("expand_event_list_from_field parameter is missing in config for application/json content-type file") p.logger.Error(err) @@ -455,18 +474,6 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C return nil } - // Check content-type = "application/x-gzip" or filename ends with ".gz" - if (resp.ContentType != nil && *resp.ContentType == "application/x-gzip") || strings.HasSuffix(info.key, ".gz") { - gzipReader, err := gzip.NewReader(resp.Body) - if err != nil { - err = errors.Wrapf(err, "gzip.NewReader failed for '%s' from S3 bucket '%s'", info.key, info.name) - p.logger.Error(err) - return err - } - reader = bufio.NewReader(gzipReader) - gzipReader.Close() - } - // handle s3 objects that are not json content-type offset := 0 for { @@ -668,3 +675,22 @@ func (c *s3Context) Inc() { defer c.mux.Unlock() c.refs++ } + +// isStreamGzipped determines whether the given stream of bytes (encapsulated in a buffered reader) +// represents gzipped content or not. A buffered reader is used so the function can peek into the byte +// stream without consuming it. This makes it convenient for code executed after this function call +// to consume the stream if it wants. +func isStreamGzipped(r *bufio.Reader) (bool, error) { + // Why 512? See https://godoc.org/net/http#DetectContentType + buf, err := r.Peek(512) + if err != nil && err != io.EOF { + return false, err + } + + switch http.DetectContentType(buf) { + case "application/x-gzip", "application/zip": + return true, nil + default: + return false, nil + } +} diff --git a/x-pack/filebeat/input/s3/input_test.go b/x-pack/filebeat/input/s3/input_test.go index 4d5ca16acf83..5eddbaad9564 100644 --- a/x-pack/filebeat/input/s3/input_test.go +++ b/x-pack/filebeat/input/s3/input_test.go @@ -7,6 +7,7 @@ package s3 import ( "bufio" "bytes" + "compress/gzip" "context" "fmt" "io" @@ -14,6 +15,8 @@ import ( "net/http" "testing" + "github.com/stretchr/testify/require" + awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/s3iface" @@ -327,3 +330,46 @@ func TestConvertOffsetToString(t *testing.T) { } } + +func TestIsStreamGzipped(t *testing.T) { + logBytes := []byte(`May 28 03:00:52 Shaunaks-MacBook-Pro-Work syslogd[119]: ASL Sender Statistics +May 28 03:03:29 Shaunaks-MacBook-Pro-Work VTDecoderXPCService[57953]: DEPRECATED USE in libdispatch client: Changing the target of a source after it has been activated; set a breakpoint on _dispatch_bug_deprecated to debug +May 28 03:03:29 Shaunaks-MacBook-Pro-Work VTDecoderXPCService[57953]: DEPRECATED USE in libdispatch client: Changing target queue hierarchy after xpc connection was activated; set a breakpoint on _dispatch_bug_deprecated to debug +`) + + var b bytes.Buffer + gz := gzip.NewWriter(&b) + _, err := gz.Write(logBytes) + require.NoError(t, err) + + err = gz.Close() + require.NoError(t, err) + + tests := map[string]struct { + contents []byte + expected bool + }{ + "not_gzipped": { + logBytes, + false, + }, + "gzipped": { + b.Bytes(), + true, + }, + "empty": { + []byte{}, + false, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + r := bufio.NewReader(bytes.NewReader(test.contents)) + actual, err := isStreamGzipped(r) + + require.NoError(t, err) + require.Equal(t, test.expected, actual) + }) + } +}