Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] S3 input: try to detect GZIPped objects (#18764) #18937

Merged
merged 1 commit into from
Jun 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- The `logstash` module can now automatically detect the log file format (JSON or plaintext) and process it accordingly. {issue}9964[9964] {pull}18095[18095]
- Improve ECS categorization field mappings in coredns module. {issue}16159[16159] {pull}18424[18424]
- Improve ECS categorization field mappings in envoyproxy module. {issue}16161[16161] {pull}18395[18395]
- The s3 input can now automatically detect gzipped objects. {issue}18283[18283] {pull}18764[18764]

*Heartbeat*

Expand Down
52 changes: 39 additions & 13 deletions x-pack/filebeat/input/s3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"sync"
Expand Down Expand Up @@ -436,7 +437,25 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C

reader := bufio.NewReader(resp.Body)

// Check if expand_event_list_from_field is given with document conent-type = "application/json"
isS3ObjGzipped, err := isStreamGzipped(reader)
if err != nil {
err = errors.Wrap(err, "could not determine if S3 object is gzipped")
p.logger.Error(err)
return err
}

if isS3ObjGzipped {
gzipReader, err := gzip.NewReader(reader)
if err != nil {
err = errors.Wrapf(err, "gzip.NewReader failed for '%s' from S3 bucket '%s'", info.key, info.name)
p.logger.Error(err)
return err
}
reader = bufio.NewReader(gzipReader)
gzipReader.Close()
}

// Check if expand_event_list_from_field is given with document content-type = "application/json"
if resp.ContentType != nil && *resp.ContentType == "application/json" && p.config.ExpandEventListFromField == "" {
err := errors.New("expand_event_list_from_field parameter is missing in config for application/json content-type file")
p.logger.Error(err)
Expand All @@ -455,18 +474,6 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C
return nil
}

// Check content-type = "application/x-gzip" or filename ends with ".gz"
if (resp.ContentType != nil && *resp.ContentType == "application/x-gzip") || strings.HasSuffix(info.key, ".gz") {
gzipReader, err := gzip.NewReader(resp.Body)
if err != nil {
err = errors.Wrapf(err, "gzip.NewReader failed for '%s' from S3 bucket '%s'", info.key, info.name)
p.logger.Error(err)
return err
}
reader = bufio.NewReader(gzipReader)
gzipReader.Close()
}

// handle s3 objects that are not json content-type
offset := 0
for {
Expand Down Expand Up @@ -668,3 +675,22 @@ func (c *s3Context) Inc() {
defer c.mux.Unlock()
c.refs++
}

// isStreamGzipped determines whether the given stream of bytes (encapsulated in a buffered reader)
// represents gzipped content or not. A buffered reader is used so the function can peek into the byte
// stream without consuming it. This makes it convenient for code executed after this function call
// to consume the stream if it wants.
func isStreamGzipped(r *bufio.Reader) (bool, error) {
// Why 512? See https://godoc.org/net/http#DetectContentType
buf, err := r.Peek(512)
if err != nil && err != io.EOF {
return false, err
}

switch http.DetectContentType(buf) {
case "application/x-gzip", "application/zip":
return true, nil
default:
return false, nil
}
}
46 changes: 46 additions & 0 deletions x-pack/filebeat/input/s3/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ package s3
import (
"bufio"
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"testing"

"github.com/stretchr/testify/require"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/s3iface"
Expand Down Expand Up @@ -327,3 +330,46 @@ func TestConvertOffsetToString(t *testing.T) {
}

}

func TestIsStreamGzipped(t *testing.T) {
logBytes := []byte(`May 28 03:00:52 Shaunaks-MacBook-Pro-Work syslogd[119]: ASL Sender Statistics
May 28 03:03:29 Shaunaks-MacBook-Pro-Work VTDecoderXPCService[57953]: DEPRECATED USE in libdispatch client: Changing the target of a source after it has been activated; set a breakpoint on _dispatch_bug_deprecated to debug
May 28 03:03:29 Shaunaks-MacBook-Pro-Work VTDecoderXPCService[57953]: DEPRECATED USE in libdispatch client: Changing target queue hierarchy after xpc connection was activated; set a breakpoint on _dispatch_bug_deprecated to debug
`)

var b bytes.Buffer
gz := gzip.NewWriter(&b)
_, err := gz.Write(logBytes)
require.NoError(t, err)

err = gz.Close()
require.NoError(t, err)

tests := map[string]struct {
contents []byte
expected bool
}{
"not_gzipped": {
logBytes,
false,
},
"gzipped": {
b.Bytes(),
true,
},
"empty": {
[]byte{},
false,
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
r := bufio.NewReader(bytes.NewReader(test.contents))
actual, err := isStreamGzipped(r)

require.NoError(t, err)
require.Equal(t, test.expected, actual)
})
}
}