Skip to content

Commit

Permalink
add timeout config for AWS SDK Go HTTP calls
Browse files Browse the repository at this point in the history
- also refactored repetitive non-negative integer config parsing based on feedback
  • Loading branch information
jpaskhay committed Feb 4, 2022
1 parent 58916f8 commit 7e1504b
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 14 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ If you think you’ve found a potential security issue, please do not post it in
* `aggregation`: Setting `aggregation` to `true` will enable KPL aggregation of records sent to Kinesis. This feature changes the behavior of the `partition_key` feature. See the KPL aggregation section below for more details.
* `compression`: Specify an algorithm for compression of each record. Supported compression algorithms are `zlib` and `gzip`. By default this feature is disabled and records are not compressed.
* `replace_dots`: Replace dot characters in key names with the value of this option. For example, if you add `replace_dots _` in your config then all occurrences of `.` will be replaced with an underscore. By default, dots will not be replaced.
* `http_request_timeout`: Specify a timeout (in seconds) for the underlying AWS SDK Go HTTP call when sending records to Kinesis. By default, a timeout of `0` is used, indicating no timeout. Note that even with no timeout, the default behavior of the AWS SDK Go library may still lead to an eventual timeout.

### Permissions

Expand Down
37 changes: 26 additions & 11 deletions fluent-bit-kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin,
logrus.Infof("[kinesis %d] plugin parameter compression = '%s'", pluginID, compression)
replaceDots := output.FLBPluginConfigKey(ctx, "replace_dots")
logrus.Infof("[kinesis %d] plugin parameter replace_dots = '%s'", pluginID, replaceDots)
httpRequestTimeout := output.FLBPluginConfigKey(ctx, "http_request_timeout")
logrus.Infof("[kinesis %d] plugin parameter http_request_timeout = '%s'", pluginID, httpRequestTimeout)

if stream == "" || region == "" {
return nil, fmt.Errorf("[kinesis %d] stream and region are required configuration parameters", pluginID)
Expand Down Expand Up @@ -119,14 +121,10 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin,
var concurrencyInt, concurrencyRetriesInt int
var err error
if concurrency != "" {
concurrencyInt, err = strconv.Atoi(concurrency)
concurrencyInt, err = parseNonNegativeConfig("experimental_concurrency", concurrency, pluginID)
if err != nil {
logrus.Errorf("[kinesis %d] Invalid 'experimental_concurrency' value %s specified: %v", pluginID, concurrency, err)
return nil, err
}
if concurrencyInt < 0 {
return nil, fmt.Errorf("[kinesis %d] Invalid 'experimental_concurrency' value (%s) specified, must be a non-negative number", pluginID, concurrency)
}

if concurrencyInt > maximumConcurrency {
return nil, fmt.Errorf("[kinesis %d] Invalid 'experimental_concurrency' value (%s) specified, must be less than or equal to %d", pluginID, concurrency, maximumConcurrency)
Expand All @@ -138,12 +136,9 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin,
}

if concurrencyRetries != "" {
concurrencyRetriesInt, err = strconv.Atoi(concurrencyRetries)
concurrencyRetriesInt, err = parseNonNegativeConfig("experimental_concurrency_retries", concurrencyRetries, pluginID)
if err != nil {
return nil, fmt.Errorf("[kinesis %d] Invalid 'experimental_concurrency_retries' value (%s) specified: %v", pluginID, concurrencyRetries, err)
}
if concurrencyRetriesInt < 0 {
return nil, fmt.Errorf("[kinesis %d] Invalid 'experimental_concurrency_retries' value (%s) specified, must be a non-negative number", pluginID, concurrencyRetries)
return nil, err
}
} else {
concurrencyRetriesInt = defaultConcurrentRetries
Expand All @@ -160,7 +155,27 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin,
return nil, fmt.Errorf("[kinesis %d] Invalid 'compression' value (%s) specified, must be 'zlib', 'gzip', 'none', or undefined", pluginID, compression)
}

return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEndpoint, stsEndpoint, timeKey, timeKeyFmt, logKey, replaceDots, concurrencyInt, concurrencyRetriesInt, isAggregate, appendNL, comp, pluginID)
var httpRequestTimeoutDuration time.Duration
if httpRequestTimeout != "" {
httpRequestTimeoutInt, err := parseNonNegativeConfig("http_request_timeout", httpRequestTimeout, pluginID)
if err != nil {
return nil, err
}
httpRequestTimeoutDuration = time.Duration(httpRequestTimeoutInt) * time.Second
}

return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEndpoint, stsEndpoint, timeKey, timeKeyFmt, logKey, replaceDots, concurrencyInt, concurrencyRetriesInt, isAggregate, appendNL, comp, pluginID, httpRequestTimeoutDuration)
}

func parseNonNegativeConfig(configName string, configValue string, pluginID int) (int, error) {
configValueInt, err := strconv.Atoi(configValue)
if err != nil {
return 0, fmt.Errorf("[kinesis %d] Invalid '%s' value (%s) specified: %v", pluginID, configName, configValue, err)
}
if configValueInt < 0 {
return 0, fmt.Errorf("[kinesis %d] Invalid '%s' value (%s) specified, must be a non-negative number", pluginID, configName, configValue)
}
return configValueInt, nil
}

// The "export" comments have syntactic meaning
Expand Down
13 changes: 10 additions & 3 deletions kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"math"
"net/http"
"os"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -114,8 +115,8 @@ type OutputPlugin struct {
}

// NewOutputPlugin creates an OutputPlugin object
func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEndpoint, stsEndpoint, timeKey, timeFmt, logKey, replaceDots string, concurrency, retryLimit int, isAggregate, appendNewline bool, compression CompressionType, pluginID int) (*OutputPlugin, error) {
client, err := newPutRecordsClient(roleARN, region, kinesisEndpoint, stsEndpoint, pluginID)
func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEndpoint, stsEndpoint, timeKey, timeFmt, logKey, replaceDots string, concurrency, retryLimit int, isAggregate, appendNewline bool, compression CompressionType, pluginID int, httpRequestTimeout time.Duration) (*OutputPlugin, error) {
client, err := newPutRecordsClient(roleARN, region, kinesisEndpoint, stsEndpoint, pluginID, httpRequestTimeout)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -171,7 +172,7 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEnd
}

// newPutRecordsClient creates the Kinesis client for calling the PutRecords method
func newPutRecordsClient(roleARN string, awsRegion string, kinesisEndpoint string, stsEndpoint string, pluginID int) (*kinesis.Kinesis, error) {
func newPutRecordsClient(roleARN string, awsRegion string, kinesisEndpoint string, stsEndpoint string, pluginID int, httpRequestTimeout time.Duration) (*kinesis.Kinesis, error) {
customResolverFn := func(service, region string, optFns ...func(*endpoints.Options)) (endpoints.ResolvedEndpoint, error) {
if service == endpoints.KinesisServiceID && kinesisEndpoint != "" {
return endpoints.ResolvedEndpoint{
Expand All @@ -184,12 +185,16 @@ func newPutRecordsClient(roleARN string, awsRegion string, kinesisEndpoint strin
}
return endpoints.DefaultResolver().EndpointFor(service, region, optFns...)
}
httpClient := &http.Client{
Timeout: httpRequestTimeout,
}

// Fetch base credentials
baseConfig := &aws.Config{
Region: aws.String(awsRegion),
EndpointResolver: endpoints.ResolverFunc(customResolverFn),
CredentialsChainVerboseErrors: aws.Bool(true),
HTTPClient: httpClient,
}

sess, err := session.NewSession(baseConfig)
Expand All @@ -206,6 +211,7 @@ func newPutRecordsClient(roleARN string, awsRegion string, kinesisEndpoint strin
creds := stscreds.NewCredentials(svcSess, eksRole)
eksConfig.Credentials = creds
eksConfig.Region = aws.String(awsRegion)
eksConfig.HTTPClient = httpClient
svcConfig = eksConfig

svcSess, err = session.NewSession(svcConfig)
Expand All @@ -219,6 +225,7 @@ func newPutRecordsClient(roleARN string, awsRegion string, kinesisEndpoint strin
creds := stscreds.NewCredentials(svcSess, roleARN)
stsConfig.Credentials = creds
stsConfig.Region = aws.String(awsRegion)
stsConfig.HTTPClient = httpClient
svcConfig = stsConfig

svcSess, err = session.NewSession(svcConfig)
Expand Down

0 comments on commit 7e1504b

Please sign in to comment.