diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 0a681c202acd..d6fe714a731d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -246,6 +246,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Prevent Elasticsearch from spewing log warnings about redundant wildcards when setting up ingest pipelines for the `elasticsearch` module. {issue}15840[15840] {pull}15900[15900] - Fix mapping error for cloudtrail additionalEventData field {pull}16088[16088] - Fix a connection error in httpjson input. {pull}16123[16123] +- Fix integer overflow in S3 offsets when collecting very large files. {pull}22523[22523] *Filebeat* diff --git a/x-pack/filebeat/input/s3/collector.go b/x-pack/filebeat/input/s3/collector.go index c3d3114c723b..52c6056b0494 100644 --- a/x-pack/filebeat/input/s3/collector.go +++ b/x-pack/filebeat/input/s3/collector.go @@ -367,12 +367,12 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, } // handle s3 objects that are not json content-type - offset := 0 + var offset int64 for { log, err := readStringAndTrimDelimiter(reader) if err == io.EOF { // create event for last line - offset += len([]byte(log)) + offset += int64(len(log)) event := createEvent(log, offset, info, objectHash, s3Ctx) err = c.forwardEvent(event) if err != nil { @@ -390,7 +390,7 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, } // create event per log line - offset += len([]byte(log)) + offset += int64(len(log)) event := createEvent(log, offset, info, objectHash, s3Ctx) err = c.forwardEvent(event) if err != nil { @@ -402,7 +402,7 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, } func (c *s3Collector) decodeJSON(decoder *json.Decoder, objectHash string, s3Info s3Info, s3Ctx *s3Context) error { - offset := 0 + var offset int64 for { var jsonFields interface{} err := decoder.Decode(&jsonFields) @@ -430,7 +430,7 @@ func (c *s3Collector) decodeJSON(decoder *json.Decoder, objectHash string, s3Inf } } -func (c *s3Collector) jsonFieldsType(jsonFields interface{}, offset int, objectHash string, s3Info s3Info, s3Ctx *s3Context) (int, error) { +func (c *s3Collector) jsonFieldsType(jsonFields interface{}, offset int64, objectHash string, s3Info s3Info, s3Ctx *s3Context) (int64, error) { switch f := jsonFields.(type) { case map[string][]interface{}: if s3Info.expandEventListFromField != "" { @@ -483,11 +483,11 @@ func (c *s3Collector) jsonFieldsType(jsonFields interface{}, offset int, objectH return offset, nil } -func (c *s3Collector) convertJSONToEvent(jsonFields interface{}, offset int, objectHash string, s3Info s3Info, s3Ctx *s3Context) (int, error) { +func (c *s3Collector) convertJSONToEvent(jsonFields interface{}, offset int64, objectHash string, s3Info s3Info, s3Ctx *s3Context) (int64, error) { vJSON, _ := json.Marshal(jsonFields) logOriginal := string(vJSON) log := trimLogDelimiter(logOriginal) - offset += len([]byte(log)) + offset += int64(len(log)) event := createEvent(log, offset, s3Info, objectHash, s3Ctx) err := c.forwardEvent(event) @@ -538,7 +538,7 @@ func readStringAndTrimDelimiter(reader *bufio.Reader) (string, error) { return trimLogDelimiter(logOriginal), nil } -func createEvent(log string, offset int, info s3Info, objectHash string, s3Ctx *s3Context) beat.Event { +func createEvent(log string, offset int64, info s3Info, objectHash string, s3Ctx *s3Context) beat.Event { s3Ctx.Inc() event := beat.Event{ @@ -566,11 +566,15 @@ func createEvent(log string, offset int, info s3Info, objectHash string, s3Ctx * }, Private: s3Ctx, } - event.SetID(objectHash + "-" + fmt.Sprintf("%012d", offset)) + event.SetID(objectID(objectHash, offset)) return event } +func objectID(objectHash string, offset int64) string { + return fmt.Sprintf("%s-%012d", objectHash, offset) +} + func constructObjectURL(info s3Info) string { return "https://" + info.name + ".s3-" + info.region + ".amazonaws.com/" + info.key } diff --git a/x-pack/filebeat/input/s3/collector_test.go b/x-pack/filebeat/input/s3/collector_test.go index 510f94d40d56..b039e2d06b94 100644 --- a/x-pack/filebeat/input/s3/collector_test.go +++ b/x-pack/filebeat/input/s3/collector_test.go @@ -9,7 +9,6 @@ import ( "bytes" "compress/gzip" "context" - "fmt" "io" "io/ioutil" "net/http" @@ -242,12 +241,12 @@ func TestCreateEvent(t *testing.T) { break } if err == io.EOF { - event := createEvent(log, len([]byte(log)), s3Info, s3ObjectHash, s3Context) + event := createEvent(log, int64(len(log)), s3Info, s3ObjectHash, s3Context) events = append(events, event) break } - event := createEvent(log, len([]byte(log)), s3Info, s3ObjectHash, s3Context) + event := createEvent(log, int64(len(log)), s3Info, s3ObjectHash, s3Context) events = append(events, event) } @@ -311,27 +310,29 @@ func TestConstructObjectURL(t *testing.T) { } } -func TestConvertOffsetToString(t *testing.T) { - cases := []struct { - offset int - expectedString string +func TestCreateObjectID(t *testing.T) { + cases := map[string]struct { + offset int64 + want string }{ - { + "object1": { 123, - "000000000123", + "object1-000000000123", }, - { + "object2": { 123456, - "000000123456", + "object2-000000123456", }, - { + "object3": { 123456789123, - "123456789123", + "object3-123456789123", }, } - for _, c := range cases { - output := fmt.Sprintf("%012d", c.offset) - assert.Equal(t, c.expectedString, output) + for name, c := range cases { + t.Run(name, func(t *testing.T) { + id := objectID(name, c.offset) + assert.Equal(t, c.want, id) + }) } }