Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kinesis compressed metrics #5588

Closed
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -265,3 +265,7 @@
[[constraint]]
name = "github.com/go-logfmt/logfmt"
version = "0.4.0"

[[constraint]]
name = "github.com/golang/snappy"
revision = "2a8bb927dd31d8daada140a5d09578521ce5c36a"
61 changes: 60 additions & 1 deletion plugins/outputs/kinesis/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## Amazon Kinesis Output for Telegraf
# Amazon Kinesis Output for Telegraf

This is an experimental plugin that is still in the early stages of development. It will batch up all of the Points
in one Put request to Kinesis. This should save the number of API requests by a considerable level.
Expand All @@ -13,16 +13,42 @@ maybe useful for users to review Amazons official documentation which is availab

This plugin uses a credential chain for Authentication with the Kinesis API endpoint. In the following order the plugin
will attempt to authenticate.

1. Assumed credentials via STS if `role_arn` attribute is specified (source credentials are evaluated from subsequent rules)
2. Explicit credentials from `access_key`, `secret_key`, and `token` attributes
3. Shared profile from `profile` attribute
4. [Environment Variables](https://github.com/aws/aws-sdk-go/wiki/configuring-sdk#environment-variables)
5. [Shared Credentials](https://github.com/aws/aws-sdk-go/wiki/configuring-sdk#shared-credentials-file)
6. [EC2 Instance Profile](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html)

## Example Configuration

```toml
access_key = "AWSKEYVALUE"
secret_key = "AWSSecretKeyValue"
region = "eu-west-1"
streamname = "KinesisStreamName"
aggregate_metrics = true
# Either "gzip", "snappy"
compress_metrics_with = gzip
morfien101 marked this conversation as resolved.
Show resolved Hide resolved
partition = { method = "random" }
debug = true
```

## Config

## AWS Configration

The following AWS configuration variables are available and map directly to the normal AWS settings. If you don't know what they are then you most likely don't need to touch them.

* access_key
* secret_key
* role_arn
* profile
* shared_credential_file
* token
* endpoint_url

For this output plugin to function correctly the following variables must be configured.

* region
Expand All @@ -31,6 +57,7 @@ For this output plugin to function correctly the following variables must be con
### region

The region is the Amazon region that you wish to connect to. Examples include but are not limited to

* us-west-1
* us-west-2
* us-east-1
Expand Down Expand Up @@ -89,3 +116,35 @@ String is defined using the default Point.String() value and translated to []byt
#### custom

Custom is a string defined by a number of values in the FormatMetric() function.

### aggregate_metrics

This will make the plugin gather the metrics and send them as blocks of metrics in Kinesis records. The number of put requests depends on a few factors.

1. If a random key is in use then a block for each shard in the stream will be created unless there isn't enough metrics then as many blocks as metrics.
1. Each record will be 1020kb in size + partition key

### compress_metrics_with

`compress_metrics_with` has the following values. If no value is set then compression is skipped.

* gzip
* snappy

They are explained below.

#### gzip

This will make the plugin compress the data using GZip before the data is shipped to Kinesis.
GZip is slower than snappy but generally fast enough and gives much better compression. Use GZip in most cases.

If both gzip and snappy are true. GZip wins.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if i set both false, do i still get aggregated but uncompressed metrics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is correct. They would be bigger so you don't get the saving in bandwidth but you do get the saving in how many put requests you make.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, this is awesome stuff. Just getting the metrics rolled up into 1 payload would give you a savings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rollup depends on the partition key type.
random:
If there is more metrics than shards then make as many payloads as available shards and fill the payloads. If they are too large (greater than 1020kb), make 2 x shards and repeat the process. It does this because we want to use all available shards on random partition keys.
If there is less metrics than shards we just create a payload for each metric. Its 1 metric per payload but if the volume is that low, it shouldn't matter.

All others:
Start with 1 and if it doesn't fit into a block of 1020kb, double the number of payloads. compress at the end.

Compression is done as a second step so you get your saving in puts and then further savings in bandwidth. Each payload is compressed after making it 1020kb or less so the payload will always fit in the put request.

As a side note my tests have shown the following:
4 shards available
Put Records (Count) — Sum:

  • Original = 22,500 puts per minute
  • Aggregated and compressed = 240 puts per minute

Incoming Data (Bytes) — Sum:

  • Original = 5.5mb per minute average
  • Aggregated and compressed = 470kb per minute average


#### snappy

This will make the plugin compress the data using Google's Snappy compression before the data is shipped to Kinesis.
Snappy is much quicker and would be used if you are taking too long to compress and write before the next flush interval.

### debug

Prints debugging data into the logs.
37 changes: 37 additions & 0 deletions plugins/outputs/kinesis/compression.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package kinesis

import (
"bytes"
"compress/gzip"
"fmt"

"github.com/golang/snappy"
)

var (
// gzipCompressionLevel sets the compression level. Tests indicate that 7 gives the best trade off
// between speed and compression.
gzipCompressionLevel = 7
)

func gzipMetrics(metrics []byte) ([]byte, error) {
var buffer bytes.Buffer

gzw, err := gzip.NewWriterLevel(&buffer, gzipCompressionLevel)
if err != nil {
return []byte{}, fmt.Errorf("Compression level is incorrect for gzip")
}
_, err = gzw.Write(metrics)
if err != nil {
return []byte{}, fmt.Errorf("There was an error in writing to the gzip writer")
}
if err := gzw.Close(); err != nil {
return []byte{}, fmt.Errorf("There was an error in closing the gzip writer")
}

return buffer.Bytes(), nil
}

func snappyMetrics(metrics []byte) ([]byte, error) {
return snappy.Encode(nil, metrics), nil
}
38 changes: 38 additions & 0 deletions plugins/outputs/kinesis/compression_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package kinesis

import (
"testing"
"time"
)

func TestGoodCompression(t *testing.T) {
tests := []string{
"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
time.Now().String(),
`abcdefghijklmnopqrstuvwzyz1234567890@~|\/?><@~#+=!"£$%^&_*(){}[]`,
}

for _, test := range tests {
_, err := gzipMetrics([]byte(test))
if err != nil {
t.Logf("Failed to gzip test data")
t.Fail()
}

// Snappy doesn't error, so we can only look for panics
snappyMetrics([]byte(test))
}
}

func TestBadGzipCompressionLevel(t *testing.T) {
oldlevel := gzipCompressionLevel
gzipCompressionLevel = 11
defer func() { gzipCompressionLevel = oldlevel }()

_, err := gzipMetrics([]byte(time.Now().String()))
if err == nil {
t.Logf("Expect gzip to fail because of a bad compression level")
t.Fail()
}

}
Loading