Skip to content

Commit

Permalink
S3 offsets should use 64bit integers (#22523)
Browse files Browse the repository at this point in the history
  • Loading branch information
Steffen Siering authored Nov 10, 2020
1 parent 28ad00b commit 9c41b67
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Prevent Elasticsearch from spewing log warnings about redundant wildcards when setting up ingest pipelines for the `elasticsearch` module. {issue}15840[15840] {pull}15900[15900]
- Fix mapping error for cloudtrail additionalEventData field {pull}16088[16088]
- Fix a connection error in httpjson input. {pull}16123[16123]
- Fix integer overflow in S3 offsets when collecting very large files. {pull}22523[22523]

*Filebeat*

Expand Down
22 changes: 13 additions & 9 deletions x-pack/filebeat/input/s3/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,12 +367,12 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,
}

// handle s3 objects that are not json content-type
offset := 0
var offset int64
for {
log, err := readStringAndTrimDelimiter(reader)
if err == io.EOF {
// create event for last line
offset += len([]byte(log))
offset += int64(len(log))
event := createEvent(log, offset, info, objectHash, s3Ctx)
err = c.forwardEvent(event)
if err != nil {
Expand All @@ -390,7 +390,7 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,
}

// create event per log line
offset += len([]byte(log))
offset += int64(len(log))
event := createEvent(log, offset, info, objectHash, s3Ctx)
err = c.forwardEvent(event)
if err != nil {
Expand All @@ -402,7 +402,7 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,
}

func (c *s3Collector) decodeJSON(decoder *json.Decoder, objectHash string, s3Info s3Info, s3Ctx *s3Context) error {
offset := 0
var offset int64
for {
var jsonFields interface{}
err := decoder.Decode(&jsonFields)
Expand Down Expand Up @@ -430,7 +430,7 @@ func (c *s3Collector) decodeJSON(decoder *json.Decoder, objectHash string, s3Inf
}
}

func (c *s3Collector) jsonFieldsType(jsonFields interface{}, offset int, objectHash string, s3Info s3Info, s3Ctx *s3Context) (int, error) {
func (c *s3Collector) jsonFieldsType(jsonFields interface{}, offset int64, objectHash string, s3Info s3Info, s3Ctx *s3Context) (int64, error) {
switch f := jsonFields.(type) {
case map[string][]interface{}:
if s3Info.expandEventListFromField != "" {
Expand Down Expand Up @@ -483,11 +483,11 @@ func (c *s3Collector) jsonFieldsType(jsonFields interface{}, offset int, objectH
return offset, nil
}

func (c *s3Collector) convertJSONToEvent(jsonFields interface{}, offset int, objectHash string, s3Info s3Info, s3Ctx *s3Context) (int, error) {
func (c *s3Collector) convertJSONToEvent(jsonFields interface{}, offset int64, objectHash string, s3Info s3Info, s3Ctx *s3Context) (int64, error) {
vJSON, _ := json.Marshal(jsonFields)
logOriginal := string(vJSON)
log := trimLogDelimiter(logOriginal)
offset += len([]byte(log))
offset += int64(len(log))
event := createEvent(log, offset, s3Info, objectHash, s3Ctx)

err := c.forwardEvent(event)
Expand Down Expand Up @@ -538,7 +538,7 @@ func readStringAndTrimDelimiter(reader *bufio.Reader) (string, error) {
return trimLogDelimiter(logOriginal), nil
}

func createEvent(log string, offset int, info s3Info, objectHash string, s3Ctx *s3Context) beat.Event {
func createEvent(log string, offset int64, info s3Info, objectHash string, s3Ctx *s3Context) beat.Event {
s3Ctx.Inc()

event := beat.Event{
Expand Down Expand Up @@ -566,11 +566,15 @@ func createEvent(log string, offset int, info s3Info, objectHash string, s3Ctx *
},
Private: s3Ctx,
}
event.SetID(objectHash + "-" + fmt.Sprintf("%012d", offset))
event.SetID(objectID(objectHash, offset))

return event
}

func objectID(objectHash string, offset int64) string {
return fmt.Sprintf("%s-%012d", objectHash, offset)
}

func constructObjectURL(info s3Info) string {
return "https://" + info.name + ".s3-" + info.region + ".amazonaws.com/" + info.key
}
Expand Down
33 changes: 17 additions & 16 deletions x-pack/filebeat/input/s3/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -242,12 +241,12 @@ func TestCreateEvent(t *testing.T) {
break
}
if err == io.EOF {
event := createEvent(log, len([]byte(log)), s3Info, s3ObjectHash, s3Context)
event := createEvent(log, int64(len(log)), s3Info, s3ObjectHash, s3Context)
events = append(events, event)
break
}

event := createEvent(log, len([]byte(log)), s3Info, s3ObjectHash, s3Context)
event := createEvent(log, int64(len(log)), s3Info, s3ObjectHash, s3Context)
events = append(events, event)
}

Expand Down Expand Up @@ -311,27 +310,29 @@ func TestConstructObjectURL(t *testing.T) {
}
}

func TestConvertOffsetToString(t *testing.T) {
cases := []struct {
offset int
expectedString string
func TestCreateObjectID(t *testing.T) {
cases := map[string]struct {
offset int64
want string
}{
{
"object1": {
123,
"000000000123",
"object1-000000000123",
},
{
"object2": {
123456,
"000000123456",
"object2-000000123456",
},
{
"object3": {
123456789123,
"123456789123",
"object3-123456789123",
},
}
for _, c := range cases {
output := fmt.Sprintf("%012d", c.offset)
assert.Equal(t, c.expectedString, output)
for name, c := range cases {
t.Run(name, func(t *testing.T) {
id := objectID(name, c.offset)
assert.Equal(t, c.want, id)
})
}

}
Expand Down

0 comments on commit 9c41b67

Please sign in to comment.