Skip to content

Commit

Permalink
S3 input: try to detect GZIPped objects (#18764) (#18937)
Browse files Browse the repository at this point in the history
* Try to detect GZIP based on content encoding header

* Check GZIP contents

* Log error before returning it

* Fixing typo

* Add comment

* Adding comment

* Adding CHANGELOG entry

* Add test case for empty contents
  • Loading branch information
ycombinator authored Jun 3, 2020
1 parent 3d9cefc commit 2974ffc
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- The `logstash` module can now automatically detect the log file format (JSON or plaintext) and process it accordingly. {issue}9964[9964] {pull}18095[18095]
- Improve ECS categorization field mappings in coredns module. {issue}16159[16159] {pull}18424[18424]
- Improve ECS categorization field mappings in envoyproxy module. {issue}16161[16161] {pull}18395[18395]
- The s3 input can now automatically detect gzipped objects. {issue}18283[18283] {pull}18764[18764]

*Heartbeat*

Expand Down
52 changes: 39 additions & 13 deletions x-pack/filebeat/input/s3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"sync"
Expand Down Expand Up @@ -436,7 +437,25 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C

reader := bufio.NewReader(resp.Body)

// Check if expand_event_list_from_field is given with document conent-type = "application/json"
isS3ObjGzipped, err := isStreamGzipped(reader)
if err != nil {
err = errors.Wrap(err, "could not determine if S3 object is gzipped")
p.logger.Error(err)
return err
}

if isS3ObjGzipped {
gzipReader, err := gzip.NewReader(reader)
if err != nil {
err = errors.Wrapf(err, "gzip.NewReader failed for '%s' from S3 bucket '%s'", info.key, info.name)
p.logger.Error(err)
return err
}
reader = bufio.NewReader(gzipReader)
gzipReader.Close()
}

// Check if expand_event_list_from_field is given with document content-type = "application/json"
if resp.ContentType != nil && *resp.ContentType == "application/json" && p.config.ExpandEventListFromField == "" {
err := errors.New("expand_event_list_from_field parameter is missing in config for application/json content-type file")
p.logger.Error(err)
Expand All @@ -455,18 +474,6 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C
return nil
}

// Check content-type = "application/x-gzip" or filename ends with ".gz"
if (resp.ContentType != nil && *resp.ContentType == "application/x-gzip") || strings.HasSuffix(info.key, ".gz") {
gzipReader, err := gzip.NewReader(resp.Body)
if err != nil {
err = errors.Wrapf(err, "gzip.NewReader failed for '%s' from S3 bucket '%s'", info.key, info.name)
p.logger.Error(err)
return err
}
reader = bufio.NewReader(gzipReader)
gzipReader.Close()
}

// handle s3 objects that are not json content-type
offset := 0
for {
Expand Down Expand Up @@ -668,3 +675,22 @@ func (c *s3Context) Inc() {
defer c.mux.Unlock()
c.refs++
}

// isStreamGzipped determines whether the given stream of bytes (encapsulated in a buffered reader)
// represents gzipped content or not. A buffered reader is used so the function can peek into the byte
// stream without consuming it. This makes it convenient for code executed after this function call
// to consume the stream if it wants.
func isStreamGzipped(r *bufio.Reader) (bool, error) {
// Why 512? See https://godoc.org/net/http#DetectContentType
buf, err := r.Peek(512)
if err != nil && err != io.EOF {
return false, err
}

switch http.DetectContentType(buf) {
case "application/x-gzip", "application/zip":
return true, nil
default:
return false, nil
}
}
46 changes: 46 additions & 0 deletions x-pack/filebeat/input/s3/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ package s3
import (
"bufio"
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"testing"

"github.com/stretchr/testify/require"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/s3iface"
Expand Down Expand Up @@ -327,3 +330,46 @@ func TestConvertOffsetToString(t *testing.T) {
}

}

func TestIsStreamGzipped(t *testing.T) {
logBytes := []byte(`May 28 03:00:52 Shaunaks-MacBook-Pro-Work syslogd[119]: ASL Sender Statistics
May 28 03:03:29 Shaunaks-MacBook-Pro-Work VTDecoderXPCService[57953]: DEPRECATED USE in libdispatch client: Changing the target of a source after it has been activated; set a breakpoint on _dispatch_bug_deprecated to debug
May 28 03:03:29 Shaunaks-MacBook-Pro-Work VTDecoderXPCService[57953]: DEPRECATED USE in libdispatch client: Changing target queue hierarchy after xpc connection was activated; set a breakpoint on _dispatch_bug_deprecated to debug
`)

var b bytes.Buffer
gz := gzip.NewWriter(&b)
_, err := gz.Write(logBytes)
require.NoError(t, err)

err = gz.Close()
require.NoError(t, err)

tests := map[string]struct {
contents []byte
expected bool
}{
"not_gzipped": {
logBytes,
false,
},
"gzipped": {
b.Bytes(),
true,
},
"empty": {
[]byte{},
false,
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
r := bufio.NewReader(bytes.NewReader(test.contents))
actual, err := isStreamGzipped(r)

require.NoError(t, err)
require.Equal(t, test.expected, actual)
})
}
}

0 comments on commit 2974ffc

Please sign in to comment.