Skip to content

Commit

Permalink
fix bug where using concurrency feature with 0 retries could drop all…
Browse files Browse the repository at this point in the history
… messages

- added missing Controller.Finish calls to tests using mock so expectations are actually checked
- added WaitGroup to test cases that invoke goroutines to ensure they complete before test method exits
- added test to protect against bugfix case (panics without bugfix)
  • Loading branch information
jpaskhay committed Feb 4, 2022
1 parent 7e1504b commit 1df964f
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 5 deletions.
2 changes: 1 addition & 1 deletion kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func (outputPlugin *OutputPlugin) FlushWithRetries(count int, records []*kinesis
currentRetries := outputPlugin.getConcurrentRetries()
outputPlugin.addGoroutineCount(1)

for tries = 0; tries < outputPlugin.concurrencyRetryLimit; tries++ {
for tries = 0; tries <= outputPlugin.concurrencyRetryLimit; tries++ {
if currentRetries > 0 {
// Wait if other goroutines are retrying, as well as implement a progressive backoff
if currentRetries > uint32(outputPlugin.concurrencyRetryLimit) {
Expand Down
56 changes: 52 additions & 4 deletions kinesis/kinesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"math/rand"
"os"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -116,6 +117,7 @@ func TestAddRecordAndFlush(t *testing.T) {
}

ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockKinesis := mock_kinesis.NewMockPutRecordsClient(ctrl)

mockKinesis.EXPECT().PutRecords(gomock.Any()).Return(&kinesis.PutRecordsOutput{
Expand All @@ -140,6 +142,7 @@ func TestAddRecordAndFlushAggregate(t *testing.T) {
}

ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockKinesis := mock_kinesis.NewMockPutRecordsClient(ctrl)

mockKinesis.EXPECT().PutRecords(gomock.Any()).Return(&kinesis.PutRecordsOutput{
Expand Down Expand Up @@ -170,11 +173,20 @@ func TestAddRecordWithConcurrency(t *testing.T) {
}

ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockKinesis := mock_kinesis.NewMockPutRecordsClient(ctrl)

mockKinesis.EXPECT().PutRecords(gomock.Any()).Return(&kinesis.PutRecordsOutput{
FailedRecordCount: aws.Int64(0),
}, nil)
// Need to use synchronization to ensure goroutine completes before test method exits
var wg sync.WaitGroup
wg.Add(1)
defer wg.Wait()

mockKinesis.EXPECT().PutRecords(gomock.Any()).DoAndReturn(
func(arg0 *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error) {
wg.Done()
return &kinesis.PutRecordsOutput{
FailedRecordCount: aws.Int64(0),
}, nil
})

outputPlugin, _ := newMockOutputPlugin(mockKinesis, false)
// Enable concurrency
Expand All @@ -188,6 +200,42 @@ func TestAddRecordWithConcurrency(t *testing.T) {
assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected FlushConcurrent return code to be FLB_OK")
}

func TestAddRecordWithConcurrencyNoRetries(t *testing.T) {
records := make([]*kinesis.PutRecordsRequestEntry, 0, 500)

record := map[interface{}]interface{}{
"testkey": []byte("test value"),
}

ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockKinesis := mock_kinesis.NewMockPutRecordsClient(ctrl)
// Need to use synchronization to ensure goroutine completes before test method exits
var wg sync.WaitGroup
wg.Add(1)
defer wg.Wait()

mockKinesis.EXPECT().PutRecords(gomock.Any()).DoAndReturn(
func(arg0 *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error) {
wg.Done()
return &kinesis.PutRecordsOutput{
FailedRecordCount: aws.Int64(0),
}, nil
})

outputPlugin, _ := newMockOutputPlugin(mockKinesis, false)
// Enable concurrency but no retries
outputPlugin.Concurrency = 2
outputPlugin.concurrencyRetryLimit = 0

timeStamp := time.Now()
retCode := outputPlugin.AddRecord(&records, record, &timeStamp)
assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected AddRecord return code to be FLB_OK")

retCode = outputPlugin.FlushConcurrent(len(records), records)
assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected FlushConcurrent return code to be FLB_OK")
}

var compressors = map[string]func([]byte) ([]byte, error){
"zlib": zlibCompress,
"gzip": gzipCompress,
Expand Down

0 comments on commit 1df964f

Please sign in to comment.