Skip to content

Commit

Permalink
Add multiline test
Browse files Browse the repository at this point in the history
  • Loading branch information
leehinman committed May 14, 2021
1 parent 85c7985 commit da10563
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 20 deletions.
16 changes: 13 additions & 3 deletions x-pack/filebeat/input/awss3/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (c *s3Collector) handleSQSMessage(m sqs.Message) ([]s3Info, error) {
continue
}
if fs.Regex.MatchString(filename) {
s3Infos = append(s3Infos, s3Info{
info := s3Info{
region: record.AwsRegion,
name: record.S3.bucket.Name,
key: filename,
Expand All @@ -308,9 +308,19 @@ func (c *s3Collector) handleSQSMessage(m sqs.Message) ([]s3Info, error) {
lineTerminator: fs.LineTerminator,
encoding: fs.Encoding,
bufferSize: fs.BufferSize,
})
break
}
if info.bufferSize == 0 {
info.bufferSize = c.config.BufferSize
}
if info.maxBytes == 0 {
info.maxBytes = c.config.MaxBytes
}
if info.lineTerminator == 0 {
info.lineTerminator = c.config.LineTerminator
}
s3Infos = append(s3Infos, info)
}
break
}
}
return s3Infos, nil
Expand Down
8 changes: 8 additions & 0 deletions x-pack/filebeat/input/awss3/ftest/sample2.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<Event><Data>
A
B
C</Data></Event>
<Event><Data>
D
E
F</Data</Event>
59 changes: 42 additions & 17 deletions x-pack/filebeat/input/awss3/s3_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"context"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"
Expand All @@ -33,12 +33,11 @@ import (
)

const (
fileName = "sample1.txt"
fileName1 = "sample1.txt"
fileName2 = "sample2.txt"
visibilityTimeout = 300 * time.Second
)

var filePath = filepath.Join("ftest", fileName)

// GetConfigForTest function gets aws credentials for integration tests.
func getConfigForTest(t *testing.T) config {
t.Helper()
Expand Down Expand Up @@ -77,8 +76,23 @@ func getConfigForTest(t *testing.T) config {
}

func defaultTestConfig() *common.Config {
return common.MustNewConfigFrom(map[string]interface{}{
return common.MustNewConfigFrom(common.MapStr{
"queue_url": os.Getenv("QUEUE_URL"),
"file_selectors": []common.MapStr{
{
"regex": strings.Replace(fileName1, ".", "\\.", -1),
"max_bytes": 4096,
},
{
"regex": strings.Replace(fileName2, ".", "\\.", -1),
"max_bytes": 4096,
"multiline": common.MapStr{
"pattern": "^<Event",
"negate": true,
"match": "after",
},
},
},
})
}

Expand Down Expand Up @@ -157,18 +171,29 @@ func TestS3Input(t *testing.T) {
collector.run()
}()

event := <-receiver
bucketName, err := event.GetValue("aws.s3.bucket.name")
assert.NoError(t, err)
assert.Equal(t, s3BucketNameEnv, bucketName)

objectKey, err := event.GetValue("aws.s3.object.key")
assert.NoError(t, err)
assert.Equal(t, fileName, objectKey)

message, err := event.GetValue("message")
assert.NoError(t, err)
assert.Equal(t, "logline1\n", message)
for i := 0; i < 4; i++ {
event := <-receiver
bucketName, err := event.GetValue("aws.s3.bucket.name")
assert.NoError(t, err)
assert.Equal(t, s3BucketNameEnv, bucketName)

objectKey, err := event.GetValue("aws.s3.object.key")
assert.NoError(t, err)

switch objectKey {
case fileName1:
message, err := event.GetValue("message")
assert.NoError(t, err)
assert.Contains(t, message, "logline")
case fileName2:
message, err := event.GetValue("message")
assert.NoError(t, err)
assert.Contains(t, message, "<Event>")
assert.Contains(t, message, "</Event>")
default:
t.Fatalf("object key %s is unknown", objectKey)
}
}
})
}

Expand Down

0 comments on commit da10563

Please sign in to comment.