-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add timeout config for AWS SDK Go HTTP calls #179
Conversation
README.md
Outdated
@@ -25,6 +25,7 @@ If you think you’ve found a potential security issue, please do not post it in | |||
* `aggregation`: Setting `aggregation` to `true` will enable KPL aggregation of records sent to Kinesis. This feature isn't compatible with the `partition_key` feature. See the KPL aggregation section below for more details. | |||
* `compression`: Specify an algorithm for compression of each record. Supported compression algorithms are `zlib` and `gzip`. By default this feature is disabled and records are not compressed. | |||
* `replace_dots`: Replace dot characters in key names with the value of this option. For example, if you add `replace_dots _` in your config then all occurrences of `.` will be replaced with an underscore. By default, dots will not be replaced. | |||
* `timeout`: Specify a timeout (in seconds) for the underlying AWS SDK Go HTTP call when sending records to Kinesis. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's rename this. Maybe http_request_timeout
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, will do. i'll update the associated vars as well, similar to your other comment
fluent-bit-kinesis.go
Outdated
@@ -160,7 +162,21 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin, | |||
return nil, fmt.Errorf("[kinesis %d] Invalid 'compression' value (%s) specified, must be 'zlib', 'gzip', 'none', or undefined", pluginID, compression) | |||
} | |||
|
|||
return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEndpoint, stsEndpoint, timeKey, timeKeyFmt, logKey, replaceDots, concurrencyInt, concurrencyRetriesInt, isAggregate, appendNL, comp, pluginID) | |||
var timeoutInt int | |||
var timeoutDur time.Duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's rename this variable. Maybe httpTimeoutDurationSeconds
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I prefer to declare the variables in the beginning of the function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I prefer to declare the variables in the beginning of the function.
Ya, I was mostly trying to keep it consistent with the other configs that had additional parsing/validation. If you want this changed, do you prefer the other config vars (e.g. compressions, concurrency) also get moved to the top?
It may also be possible to avoid some of the var declarations based on your comment around extracting the int parsing into a function 🤔 . I can play around with it a bit when looking at that change.
fluent-bit-kinesis.go
Outdated
timeoutInt, err = strconv.Atoi(timeout) | ||
if err != nil { | ||
logrus.Errorf("[kinesis %d] Invalid 'timeout' value %s specified: %v", pluginID, timeout, err) | ||
return nil, err | ||
} | ||
if timeoutInt < 0 { | ||
return nil, fmt.Errorf("[kinesis %d] Invalid 'timeout' value (%s) specified, must be a non-negative number", pluginID, timeout) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can extract these 7 lines into a function. Because we are doing similar things for other configs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, it will definitely clean some things up. I also just noticed this carried over an inconsistency around the error logging that can be cleaned up as part of the extraction (duplicate logging in case of error).
Thanks again for your initial review @hossain-rayhan! I pushed an initial set of changes. Let me know if you want the refactoring taken further -- I was erring on trying to minimize this PR's impact on existing code. It may be possible to replace the |
LGTM as long as it is fully tested. Also @jpaskhay, you might need to rebase the code as this branch is out-of-date. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM, left small comments.
if err != nil { | ||
return 0, fmt.Errorf("[kinesis %d] Invalid '%s' value (%s) specified: %v", pluginID, configName, configValue, err) | ||
} | ||
if configValueInt < 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the expected behavior is configValueInt == 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it depends on the individual configs and their subsequent usage. this was just a refactor of some existing repetitive code based on feedback.
side note: looks like there is an existing bug where setting the experimental_concurrency_retries
to 0
when using experimental_concurrency > 0
will swallow records without actually sending to kinesis since this for-loop condition will never be satisfied and the Flush
call is inside it:
for tries = 0; tries < outputPlugin.concurrencyRetryLimit; tries++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the fix here is to change the condition to <=
? @zhonghui12
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the fix here is to change the condition to
<=
? @zhonghui12
ya that seems like it would be the simplest fix
README.md
Outdated
@@ -25,6 +25,7 @@ If you think you’ve found a potential security issue, please do not post it in | |||
* `aggregation`: Setting `aggregation` to `true` will enable KPL aggregation of records sent to Kinesis. This feature isn't compatible with the `partition_key` feature. See the KPL aggregation section below for more details. | |||
* `compression`: Specify an algorithm for compression of each record. Supported compression algorithms are `zlib` and `gzip`. By default this feature is disabled and records are not compressed. | |||
* `replace_dots`: Replace dot characters in key names with the value of this option. For example, if you add `replace_dots _` in your config then all occurrences of `.` will be replaced with an underscore. By default, dots will not be replaced. | |||
* `http_request_timeout`: Specify a timeout (in seconds) for the underlying AWS SDK Go HTTP call when sending records to Kinesis. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we explain what 0 means for timeout config if 0 is also a valid value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, can update this. the only caveat is that although a value of 0
is supposed to represent no timeout, there is still some sort of timeout mechanism that occurs within the AWS SDK Go at 5m30s (mentioned in the linked Issue this PR is trying to address). guessing it's related to the DefaultRetryer
behavior but i wasn't familiar enough with the SDK to see how it all tied together. the caveat would likely be worth mentioning in some way for accuracy, but i may not be able to speak to its specifics as accurately
@jpaskhay this PR is out-of-date to merge. You might need to rebase it before we can merge it. Also, @PettitWesley, it seems that team has reviewed this PR before, could you please take a look at this one before we merge it? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhonghui12 Can you test this out to be safe? You can run it in a container with network mode none or turn your internet off to make all network requests fail.
@jpaskhay , I am trying to test you change locally and could you tell me which Also, you may need to combine your commits into one before we can merge your code. |
hey @zhonghui12 , we used sure, i can rebase. sorry i was being lazy and just clicking the |
Thanks for these details @jpaskhay. I will test it tomorrow and let you know. Today I tried |
c2ed5b4
to
287a73f
Compare
sounds good. been a while since i paired w/ a colleague on it, so was having trouble recalling all of the details. i think this blurb from the SDK issue covers the most important parts of what we observed:
i'm not sure if you'll be able to replicate the exact scenario we had, but hopefully you should still be able to see the different timeouts (read and context cancellation) in your test setup. the empty response body with 200 OK seemed to be the cause of the SDK hanging and treating it as a timeout for our original issue. reading through the issues i linked in the SDK report again, it's likely just a result of the point in the SDK code flow when network/connection issues occur (i.e. response body read) |
@jpaskhay you are right.. so I really cannot reproduce it but I've verified that So could you please clear up the code a little bit and resolve the conflict? We have new changes merged so it seems that your code has some conflicts. And it would be great if you could test it again and attach some output logs for us. Thanks. |
f71bd34
to
194c214
Compare
ok, i resolved the conflict. sorry for the extra force push -- didn't notice my IDE had auto-formatted the file 🤦, so had to revert that.
just to clarify, was there anything else to address or were you just referring to the conflict? sure, let me put together a simpler MVP for retesting this. it should be a little easier than how we were testing before. should be able to do so some time tomorrow. i believe in previous tests the timeout was observed on a colleague's laptop, so hopefully it can still be reproduced reliably. also this discussion wasn't resolved. were you folks wanting that addressed in this PR or a separate issue/PR? not sure if @PettitWesley wanted to proceed with his suggested solution. |
@jpaskhay Thanks.
Yeah, I just referred to the conflict. |
So we are fine with addressing it in this PR as long as it is a separate commit. Very Appreciate it if you would like to fix it for us. Thanks |
sure, will do. can probably throw in a test case for that one too. i probably won't get to this and rerunning the tests till over the weekend or first thing monday. thanks again for pushing this forward! |
c9d475f
to
198bddd
Compare
Ok, I have a test that will be running overnight. Hopefully it will encounter a timeout, and I can scrape some relevant logs for it to post here. Worst case I may need to create a VM in our problematic network zone to try reproducing there. In the meantime, I rebased to this morning's changes and have pushed up the retry handling fix along with a corresponding test. While trying to add the test case, I ran into an issue that made me notice the existing test cases w/ the mock client weren't actually verifying the calls (they were all missing the deferred Finish call). You can observe this by changing one of the non-concurrent test methods in mainline code to add The existing |
Hey @zhonghui12 , unfortunately we've been unable to reproduce the issue. At the time this PR was submitted, we were able to reproduce the timeout issue within a few minutes of the start of a test run. We don't recall if it was only in our busier network zone, or if it also occurred on laptops and other network zones (sorry been a few months now since we troubleshooted). So perhaps our network zone and/or the kinesis service itself have improved since then. I didn't see anything obvious in the recent SDK release notes related to our issue, but maybe there's something indirect that helped there too. We've tried test runs spanning anywhere from 3h-9h on laptops, different network zones, etc. and we haven't observed the issue. Obviously that's a good thing overall, but it would still be nice to have this option available as that seems to be the most common work-around in case it reappears for us or others. Looks like I need to rebase again. Thanks again for all your help, and let me know if there's anything else we can do for this. |
- also refactored repetitive non-negative integer config parsing based on feedback
… messages - added missing Controller.Finish calls to tests using mock so expectations are actually checked - added WaitGroup to test cases that invoke goroutines to ensure they complete before test method exits - added test to protect against bugfix case (panics without bugfix)
198bddd
to
1df964f
Compare
@zhonghui12 Can we merge this one soon? |
@PettitWesley yeah I think so. I am good to merge this PR if team thinks we can make this change even the origin problem cannot be reproduced. |
@zhonghui12 Yea I am fine to merge it. The change is straightforward. For core Fluent Bit plugins, there are many network settings including timeout, so it makes sense to have this for the go plugin as well. Please merge it. |
Hi @jpaskhay, |
Fixes #178
Description of changes:
This PR adds a new configuration to customize the timeout in the HTTP client being used by the AWS SDK Go
PutRecords
calls. It is based on guidance specified in the SDK docs. All other default settings in the HTTP client being used will stay the same.It has been tested and verified via local builds/patches (both specifying a config and leaving it out to default to existing behavior).
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.