-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
outputs.kinesis - log record error count #8817
outputs.kinesis - log record error count #8817
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤝 ✅ CLA has been signed. Thank you!
resp, err := k.svc.PutRecords(payload) | ||
if err != nil { | ||
log.Printf("E! kinesis: Unable to write to Kinesis : %s", err.Error()) | ||
return time.Since(start) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the request failed, then resp
will not be set
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you saying resp needs a nil check in addition to an err check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I'm just giving helpful insight into how the AWS APIs tend to work.
} | ||
var failed = *resp.FailedRecordCount | ||
if failed > 0 { | ||
log.Printf("E! kinesis: Unable to write %+v of %+v record(s) to Kinesis", failed, len(r)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously it was silently dropping metrics
2021-02-05T22:39:24Z D! [outputs.kinesis] Wrote batch of 1000 metrics in 1.725811502s
2021-02-05T22:39:24Z D! [outputs.kinesis] Buffer fullness: 33 / 1000 metrics
2021-02-05T22:39:24Z E! kinesis: Unable to write 250 of 500 record(s) to Kinesis
2021-02-05T22:39:24Z D! Wrote a 500 point batch to Kinesis in 578.672453ms.
2021-02-05T22:39:24Z D! [outputs.kinesis] Wrote batch of 1000 metrics in 1.888566606s
2021-02-05T22:39:24Z D! [outputs.kinesis] Buffer fullness: 33 / 1000 metrics
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be great if someone updated this plugin to use
Log telegraf.Logger `toml:"-"`
then the code could use k.Log.Debug()
without conditionals, etc.
tests can set the logger with testutil.Logger{}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I plan to do a few more PRs after this one. Be glad to tackle that for you.
func (m *mockKinesisPutRecords) AssertRequests( | ||
assert *assert.Assertions, | ||
expected []*kinesis.PutRecordsInput, | ||
) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not overly exciting assertions for this PR, but this will be helpful for testing batching behavior in future PRs. The current test coverage is lacking, so if I can set a good precedent. :)
b05945a
to
c7b44e1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change around logs and errors looks good. I've left a few comments with minor things and questions around the kinesis api mock.
CHANGELOG.md
Outdated
@@ -2,6 +2,7 @@ | |||
|
|||
#### Bugfixes | |||
|
|||
- [#8817](https://github.com/influxdata/telegraf/pull/8817) `outputs.kinesis` Log when PutRecords unsuccessfully processes records |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please remove this line? We update the changelog at the time when prs make it into a release/patch.
plugins/outputs/kinesis/kinesis.go
Outdated
if err != nil { | ||
log.Printf("E! kinesis: Unable to write to Kinesis : %s", err.Error()) | ||
} | ||
var failed = *resp.FailedRecordCount |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could be just short var declaration instead so failed := &resp.FailedRecordCount{}
plugins/outputs/kinesis/kinesis.go
Outdated
@@ -154,26 +155,33 @@ func (k *KinesisOutput) SetSerializer(serializer serializers.Serializer) { | |||
k.serializer = serializer | |||
} | |||
|
|||
func writekinesis(k *KinesisOutput, r []*kinesis.PutRecordsRequestEntry) time.Duration { | |||
func (k *KinesisOutput) mockKinesisService(svc kinesisiface.KinesisAPI) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure about adding a mockservice into the plugin code. Is there a way to mock this just within the test?
…record-error-count
…g-record-error-count
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes look good, thanks for doing that so quickly!
@helenosheaa, Are you waiting on anything from me for this PR? |
@JeffAshton no looks good, happy to merge. Would be great if you could change this in your next pr to use telegraf.Logger instead :) |
Step 1 for Issue #8818 - outputs.kinesis silently drops metrics
Required for all PRs: