Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kinesis compressed metrics #5588

Closed

Conversation

morfien101
Copy link
Contributor

@morfien101 morfien101 commented Mar 15, 2019

Required for all PRs:

  • Signed CLA.
  • Associated README.md updated.
  • Has appropriate unit tests.

Adding in toggles to allow for the aggregation and compression of metrics before they get sent into Kinesis.
AWS is billed on number of requests limit and bandwidth limit. Therefore we should be able to maximise usage on both of those factors.

Aggregation of metrics makes use of the bulk Serializers in telegraf. In the output it puts them into buckets that are within the 1mb limit of Kinesis.

You can then further reduce the useage by reducing the size of the request via compression using either GZip or Googles Snappy.
GZip gives better compression but takes much longer than Snappy.

Experiments:
GZip gives 9.5/1 compression on our 500 lines of metrics and takes 3.5ms.
Snappy gives 4.9/1 compression on our 500 lines of metrics and takes 230.753µs. (Its very fast)

GZip is genrally fast enough and gives better savings.

You should experiment with the compression and use what ever works best for you.
Side note: This does mean that the Kinesis consumer is going to have to know that the data is compressed and with what.

The default behaviour for this plugin is the original behaviour.

This also solves this issue:
#4795

This will make the plugin compress the data using GZip before the data is shipped to Kinesis.
GZip is slower than snappy but generally fast enough and gives much better compression. Use GZip in most cases.

If both gzip and snappy are true. GZip wins.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if i set both false, do i still get aggregated but uncompressed metrics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is correct. They would be bigger so you don't get the saving in bandwidth but you do get the saving in how many put requests you make.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, this is awesome stuff. Just getting the metrics rolled up into 1 payload would give you a savings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rollup depends on the partition key type.
random:
If there is more metrics than shards then make as many payloads as available shards and fill the payloads. If they are too large (greater than 1020kb), make 2 x shards and repeat the process. It does this because we want to use all available shards on random partition keys.
If there is less metrics than shards we just create a payload for each metric. Its 1 metric per payload but if the volume is that low, it shouldn't matter.

All others:
Start with 1 and if it doesn't fit into a block of 1020kb, double the number of payloads. compress at the end.

Compression is done as a second step so you get your saving in puts and then further savings in bandwidth. Each payload is compressed after making it 1020kb or less so the payload will always fit in the put request.

As a side note my tests have shown the following:
4 shards available
Put Records (Count) — Sum:

  • Original = 22,500 puts per minute
  • Aggregated and compressed = 240 puts per minute

Incoming Data (Bytes) — Sum:

  • Original = 5.5mb per minute average
  • Aggregated and compressed = 470kb per minute average

Added a comment to a function to explain what it should do.
@tehlers320
Copy link

tehlers320 commented Mar 15, 2019

I compiled this and tested it locally and everything works great. We would have to update our consumer to handle compression. But if it helps the maintainers, this is working great so far at low volume.

     role_arn = "SOMETHING"
     region = "someregion"
     streamname = "somestream"
     data_format = "influx"
     aggregate_metrics = true
     gzip_records =  false
     snappy_records =  false
     partition =  { method = "random" }
     debug = true
2019-03-15T17:01:10Z D! Starting aggregated writer with 7 metrics.
2019-03-15T17:01:10Z I! Wrote: '{
  FailedRecordCount: 0,
  Records: [{
      SequenceNumber: "someshard1",
      ShardId: "shardId-000000000000"
    },{
      SequenceNumber: "someshard2",
      ShardId: "shardId-000000000001"
    }]
}'
2019-03-15T17:01:10Z D! Wrote aggregated metrics in 45.458483ms.
2019-03-15T17:01:10Z D! [outputs.kinesis] wrote batch of 7 metrics in 45.672435ms

This does break the old layout though users will have to update their configs
The following is no longer valid

       [outputs.kinesis.partition]
         method = "tag"
         key = "host"
         default = "mykey"

@glinton
Copy link
Contributor

glinton commented Mar 15, 2019

Thanks for the code @morfien101 and review @tehlers320. A full review is in the queue, but at first glance I noticed the multiple new bool options. We generally tend to prefer strings/string slices as they're more future proof (don't need to add another bool flag and handle that logic if another compression algorithm is deisred).

@glinton glinton added feat Improvement on an existing feature such as adding a new setting/mode to an existing plugin area/aws AWS plugins including cloudwatch, ecs, kinesis labels Mar 15, 2019
@glinton glinton added this to the 1.11.0 milestone Mar 15, 2019
Updated readme file to represent this.
Updated the Sample config to include the configuration changes.
@morfien101
Copy link
Contributor Author

@glinton I liked the idea of making the configuration option a string. So I have implemented it and updated the relevant documents with the changes.

@morfien101
Copy link
Contributor Author

@tehlers320 I would be happy to also look at the consumer if this change is accepted.

@morfien101 morfien101 mentioned this pull request Apr 8, 2019
3 tasks
@morfien101
Copy link
Contributor Author

morfien101 commented Apr 18, 2019

I'm not sure what you want me to do with the failing tests. Because its taken so long to merge things have moved on from master and there are a lot of file changes pending now.

The tests are failing of go dep checks...

@glinton
Copy link
Contributor

glinton commented Apr 18, 2019

Woah. Can you rebase against our master and push again?

@morfien101
Copy link
Contributor Author

I have synced my fork/branch with master. And you can see the tests have passed again. When can I expect this to be pulled in so I can stop nursing it :(

Also my org is waiting for this feature to be folded in so we can stop using a forked version of telegraf :)

@danielnelson
Copy link
Contributor

@morfien101 Thanks for the pull request, I've been thinking about how we should tackle compression and dynamic batch sizing in this plugin and others without introducing too much complexity to the plugins, and in as uniform a way as possible.

On compression, I just opened a pull request (#5830) on the amqp output that does something similar to this. I only have gzip support but it would make sense to add snappy in a similar way and use the same naming scheme between plugins.

Batch size, sometimes called aggregating but I avoid that term here since we usually use it to refer to operations like average/sum/etc, can already be set on a per plugin basis. I don't think output plugins should attempt to adjust this size directly, or retry themselves, but I think it would make sense to have an error that can be returned from the Write( method to indicate the batch failed to write because it was too large, maybe ErrBatchTooLarge. Telegraf could temporarily reduce the batch size on the next write. This would reduce the tasks that the plugin would need to perform to using SerializeBatch when batch format is enabled.

If that sounds good, I can work on the error for outputs to return and the reduction of write batch size if you could work on the snappy compression and batch handling/detection of too large batches.

@morfien101
Copy link
Contributor Author

@danielnelson I like the idea of having the compression built into telegraf and being easy to call rather than being something that we need to implement every time we want to use it.

I'm struggling to understand what you want to do with the batch sizing though.
As a plugin developer I see 2 ways of working here.

Batch size of raw metrics - handled by telegraf
This to me, is how many metrics are sent to the Write function.

Payload sizes - handled by output plugin
This is how much data we send in each write request to the endpoint. eg. The body size of a HTTP request.

To really answer this request I really had to think about what a output plugin is meant to do. It seems that you want to take away some of the responsibility in order to make it easier to work with.
I see output plugins as payload management for onward services. They do the following:

  • Manipulate the data to better fit the model required on the receiving service.
  • Batch or size the payloads to correctly/efficiently.
  • Write the data using the required libraries or connection handlers.

I think there is room to have a standard set of tools available to developers to make payloads which can then be compressed and written out. Payloads can hold RAW data and then output compressed data for example. This is what I'm seeing as common tasks in the plugins. All other tasks are very specific to the onward service.

I think it is right for the output plugin to manage the batching for the specific service and telegraf should just deliver to the plugin as fast as the plugin can handle traffic. The output plugin is the best place to know what the endpoint is expecting and how large the payload can be. However, it is a pain to implement the payload logic with size tracking, etc...

Telegraf sends metrics in size and time based triggers. Optimal size batching is really hard to do when you have time based triggers. Telegraf already does batch sizing as a common configuration for each plugin. So really all we can do is offer tooling to reduce complexity and create standard working methods.

As for estimating the size of a payload before or after compression has completed, to me is not worth it. To me this is really the same thing as varying batches of metrics. I think the savings that you would get would be stripped away every time you need to re-compress and/or write data again. Therefore ErrBatchTooLarge in my mind is not really useful.
If the output plugin got 25,000 metrics and it managed to compress and send say 22,000 then dealing the with the rest should be fine and you would still save time and onward resources. If the output's endpoint is so stringent on sizing, perhaps it would need to have a separate go routine that accumulates the metrics and forwards then on as and when needed, but I suspect that this is not general use and should be something the output handles as a special case.

I'm thinking we implement an output payload tracker that developers can use as part of their plugin. It should do things like, compress payloads, convert metrics to lines using selected serialiser, track the size of serialised data, etc... Doing this will give users the ability to work in a standard way, which in turn will give you guys an easier job to review the code when it comes in.

Converting my code in this PR to use something like that would be pretty easy...

@danielnelson
Copy link
Contributor

I'm hoping to just have static batch sizes based on the metric_batch_size and no reserialization or recompression in the output.

The ErrBatchTooLarge idea is really meant to be a fallback in the case that the batch can't be sent, and is meant to be a way to prevent the output from being completely unable to send the data if one particular batch is impossible to send. If it occurs with any sort of frequency, the operator should adjust the batch size down. This feature probably does not need to be added at the same time as this work.

I don't think we need a payload manager though, I could move this hunk (191-203) into a shared location, to ease the creation of a ContentEncoder. Then the grouping is only about 5 lines of code:

for _, metric := range metrics {
routingKey := q.routingKey(metric)
if _, ok := batches[routingKey]; !ok {
batches[routingKey] = make([]telegraf.Metric, 0)
}
batches[routingKey] = append(batches[routingKey], metric)
}

Maybe the grouping could go into a function:

func PartitionBy(metrics telegraf.Metric, key func(m telegraf.Metric)) map[string]telegraf.Metric

Used like:

batches := PartitionBy(metrics, q.routingKey)

After that we just need to iterate over the groups, serializer.SerializeBatch(), encoder.Encode() and send. I think this is at about the right level of abstraction.

@morfien101
Copy link
Contributor Author

@danielnelson Your example makes it clearer to me now.

Just to recap to make sure I get it. Telegraf will send a batch which is configured by the user. Upon sending the data after transformations you can throw an error saying that the payload/batch was too large to send. Telegraf can then reduce the number of messages it send into the Write function by a factor yet to be decided. In the hope that reduced batch size will reduce the size of the payload/body being sent out.
This reduced the number of times you may have to break apart the data reduce its size and re-batchSerialize it and then compress it.

I assume that the encoding.Encode would be used to compress and decompress your data via GZip or Snappy?

I'd be happy to help but I'm not sure where put this in the code base.

@danielnelson danielnelson modified the milestones: 1.11.0, 1.12.0 May 24, 2019
@danielnelson
Copy link
Contributor

Just to recap to make sure I get it. Telegraf will send a batch which is
configured by the user. Upon sending the data after transformations you can
throw an error saying that the payload/batch was too large to send. Telegraf
can then reduce the number of messages it send into the Write function by a
factor yet to be decided. In the hope that reduced batch size will reduce
the size of the payload/body being sent out.

Right

This reduced the number of times you may have to break apart the data reduce
its size and re-batchSerialize it and then compress it.

It's less about this and more about simplifying the code of the outputs by not making payload sizing or retries something they need to deal with explicitly. To be honest I wish I could find a way to remove the splitting of batches too but I'm not sure there is a good way.

However, the error does provide a fairly simple way to shrink the batches in the case that a payload cannot be sent due to size. It is meant to be a rare occurance though, which is why I will make Telegraf log a warning each time it happens.

I'd be happy to help but I'm not sure where put this in the code base.

You could add snappy support here:

https://github.com/influxdata/telegraf/blob/master/internal/content_coding.go

Then we would need to add two options to the output: content_encoding and use_batch_format. The use_batch_format option would work like in the amqp output and is only for backwards compatibility, new plugins should always use SerializeBatch.

I'll add support for the special error type soon, it's a bit more complicated to explain and in fact I'm not yet sure how exactly it will work, but I think it shouldn't be too bad. Afterwards, we should be able to add support to the plugins easily.

@morfien101
Copy link
Contributor Author

@danielnelson I have managed to get some run time next week to work on this between my commitments at work. I'll look at this again and see what still needs to be done.

Would it not be possible to push it in as it is currently, then retro fit the changes that you propose?

I am keen to our infrastructure using main releases again, as we currently run the code as it stands in a fork.

@danielnelson
Copy link
Contributor

It seems like this is way to much code for whats being done, perhaps I'm missing something. Don't we just need a batching loop similar to what I pasted in #5588 (comment)

@danielnelson danielnelson modified the milestones: 1.12.0, 1.13.0 Aug 5, 2019
…kinesis_compressed_metrics

Keeping branch updated
@morfien101
Copy link
Contributor Author

Kinesis allows you send data on partitions, the partitions are linked to the shards. So the more partitions you have the more shards you can use. However consumers need to know what partitions you have if they want to consume your metrics. This is one way that Kinesis allows you send.

The other option is that you just use random partitions and send metrics on all shards. Shard numbers can change so we get a count on startup. (We use this to maximise usage sending bandwidth on the stream)

I think you are looking at the "packageMetrics" function that looks rather large. However its not really that large for what it does.

It needs to deal with random partition keys and also static partition keys. It also needs to split the data into blocks that will fit into the put records for kinesis. We don't know the size or the count of data designated for static or random keys beforehand so batch sizing is not helpful and we need to sort it anyway.

Here is the outline of the logic:
When metrics come in, we need to check if they are for static or random partition keys.
The simplest is actually the static keys. We serialise it and check if it will fit in a puts request. I expect that 99% of the time it will fit. If it doesn't fit we know how many blocks we need so split it and make new blocks and repeat the process.

The put requests are measured on bytes not how many measurements are in there. So it is technically possible for a single measurement to actually consume an entire payload but extremely unlikely. More likely is a handful of dense measurements consuming a payload. This also means a batch size doesn't really help here either since we still need to serialise and count the bytes to see if it will fit in a put request payload.

If the metrics are held under the random key its a bit more complicated.

First we need to measure how many requests there are.

  • If there are less measurements than shards then we make 1 block and package for sending. (We gain nothing for splitting such a small number of measurements.)
  • If there are more, we need to split them into blocks for each shard. (We use as many blocks as shards to maximise throughput). This then needs to be broken down into put requests payloads that will fit the []byte limit. 99% of the time the first serialisation will fit, but its not guarantied, but we know the size after so we know how many blocks we will need.
    After that we need to assign each new put request a random partition key, in our case a guid.

So to recap:
If random, split into blocks for shards, serialise to get size, if it fits use it, if not break down into required blocks, do it again with knowledge of actual size. Assign keys and pass back.

If static, serialise to get size, if it fits use it, if not break down into required blocks, do it again with knowledge of actual size. Pass back

Metrics are now packaged and ready to send.

Finally encode (aka compress) put request payloads if required and send to Kinesis.
Kinesis put requests are never too large because we max them out before compression. Mainly because we don't know the size after compression and we don't want to waste time and CPU doing multiple compression cycles.

If it helps we have been using this code in production and have sent billions of measurements through it already. There is very little CPU/RAM usage that we can actually see increase. No more than would be expected when enabling more outputs. We run it in next to InfluxDB output.

@danielnelson
Copy link
Contributor

It sounds like it is still doing dynamic batch sizing, this is the part I think we shouldn't include. I'd like to just do fixed size batches and then send them all at once (in writes of up to 500 batches of course). I know it won't be possible to pack quite as tightly, but I don't think it's worth the complexity and extra resource use of attempting it.

How many shards is typical, 10, 100, 1000? On the minimum shard logic, if it all fits in one payload, doesn't this either mean there isn't that much data; in which case why optimize it? If it doesn't fit in one payload then it should get distributed over the shards. High throughput deployments would have multiple Telegraf sending, so you should get a spread from that too.

@morfien101
Copy link
Contributor Author

Packing tight saves money and increases throughput. Kinesis is billed on how many shards you have (bandwidth available) and also how my put requests you make. https://aws.amazon.com/kinesis/data-streams/pricing/ see: Put Payload.
So compression saves on bandwidth, aggregation saves on put requests. (The more aggregation the better the compression chances also).

"Dynamic batching" is done for free in this plugin, here it's called sorting. We have to to do it. We need to sort the metrics and put them into put request blocks. It make no difference if you set the batch size to 10 or 10,000 it needs to be done. We also have no idea the size of the measurements coming in which means we need to sort them so they fit into the Put Request payloads which are measured in bytes not how many measurements you have.

I agree that static batch sizing is a good idea, it just doesn't fit well here. It costs more money and doesn't really add any benefit. You can set it, but the same processes need to take place to get the data into Kinesis. Even if you put it in, for this output I would make it a large number to get the savings and throughput that this plugin would give me on Kinesis.
Maybe in a plugin like the InfluxDB Output where you don't really have constraints like size, shards and partitions. There it is more about how many lines you push in. So they see the world in the same way and they work well together.

I can't really tell you how many shards is a typical number. We run 4 shards with this plugin using the code in this PR. I expect we would need 10+ if it wasn't using the compression and aggregation.

@morfien101
Copy link
Contributor Author

@danielnelson Is there any movement on this PR? I'm hoping my comments have explained the situation.

@danielnelson
Copy link
Contributor

So as I understand it, we want to put the messages into as few "PUT payloads" as possible. A payload can have up to 500 entries, but the charge is based on the size of the entries, billed for each multiple of 25KB. Do I have that right?

So my main concern is the added complexity to the output, and right now this feels too complex for me to be willing to bring it in. I think part of the issue is that our compression interface is built to compress a full payload and doesn't allow for incremental writing, something that gzip and snappy should both allow:

type ContentEncoder interface {
	Encode([]byte) ([]byte, error)
}

At any rate, I'd like to take a stab at simplifying this change before merging, and if this ends up being the easiest way and I can't simplify more then at least I'll be satisfied and we can merge it.

Merge branch 'master' of https://github.com/influxdata/telegraf into kinesis_compressed_metrics
@morfien101
Copy link
Contributor Author

morfien101 commented Aug 27, 2019

I can try break the code down into simpler sections.

As for Encoding the data incrementally, I did some testing on it.
Basically it boils down to being bad for this plugin in both performance and data output.

Gzip compression with level 7 and Snappy encoding test results:

3,536,106 lines or 832Mb of data
667Mb - GZIP Compress each line
88Mb - GZIP Compress everything together
739Mb - Snappy Compress each line
172Mb - Snappy Compress everything together

2,000 lines or 1.1Mb of data
595Kb - GZIP Compress each line
128Kb - GZIP Compress everything together
735Kb - Snappy Compress each line
255Kb - Snappy Compress everything together

Our goal is to get the package into the smallest amount of data possible and sent off with in the flush interval. Which means it doesn't need to be the fastest code, it just needs to be fast enough. But Size and PUT requests matter as they are what we get billed on.

@danielnelson
Copy link
Contributor

So let's assume we are going to do multiple lines per record and with compression. The current behavior will remain only for compatibility. Grouping by partition is also required and done today so that's a given.

My understanding is that billing is based only around the PUT units and allocated shards. We can't influence allocated shards in Telegraf, only PUT units. This means that price efficiency is going to be based on how close to the 25K cutoffs we are at. Concretely, a 26KB payload would use 2 units while a 74K payload would use 3, the price efficiency would be 26/2 = 13K/unit and 74/3 = 24.6K/units.

A typical batch size of 1k - 5k would definitely exceed a single PUT unit, and the most efficient way to send is as a single record so long as it is under the 1MiB limit. If it exceeds this limit balancing across records is possible but would take a heroic effort and it's not worth it in my mind as it will provide only a small amount of gains.

This means we really just need to make sure the batch weighs in under the 1MiB record limit. With snappy compression this is still 8k lines, and if a user has multiple shards it will be divided across these as well. So in the normal case, the entire batch will fit in a single record. Only if a user configures the metric_batch_size to be too large will this be a problem, or in the case of a rogue batch that has very long lines or is uncompressable.

The rogue batch case could be handled by the ErrBatchTooLarge idea I floated above. All that remains for us then is to take the entire batch and serialize and compress it into a single record per partition. We could record the size of the records for splitting across the 5MiB total request limit.

@morfien101
Copy link
Contributor Author

So billing is one aspect that we need to consider. I figured that for billing we cover it as best we can by compression and aggregation to get the data that we are pushing as small as possible.

The other aspect is that we have is bandwidth limits per shard both read and write. We really care only for the PUT side of things in this plugin. The random partition keys allows us to split our payloads (PUT requests) between shards therefore allowing a higher through put (this is how we use it). Telegraf is very capable of pushing data into Kinesis faster than a single shard is able to keep up with.

A single maxed out PUT request on a single shard would theoretically take 5 seconds to upload, as a shard has a maximum ingestion rate of 1Mib/sec. However given 5 shards, random partition keys and the current code we can get that down to 1 second.

https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html

A typical batch size of 1k - 5k would definitely exceed a single PUT unit, and the most efficient way to send is as a single record so long as it is under the 1MiB limit. If it exceeds this limit balancing across records is possible but would take a heroic effort and it's not worth it in my mind as it will provide only a small amount of gains.

This is only true for static partition keys and price of ingestion as explained above. When you use random partition keys, you would be looking to make best use of the bandwidth available by creating multiple records spread across multiple PUT requests and then sent to multiple shards. Aggregation and compression in this case serves as a means to lower price of ingestion but more importantly increases the amount of data that you are capable of sending.

// * ErrCodeProvisionedThroughputExceededException "ProvisionedThroughputExceededException"
// The request rate for the stream is too high, or the requested data is too
// large for the available throughput. Reduce the frequency or size of your
// requests. For more information, see Streams Limits (http://docs.aws.amazon.com/kinesis/latest/dev/service-sizes-and-limits.html)
// in the Amazon Kinesis Data Streams Developer Guide, and Error Retries and
// Exponential Backoff in AWS (http://docs.aws.amazon.com/general/latest/gr/api-retries.html)
// in the AWS General Reference.

ErrBatchTooLarge in my mind is only required to slow down the sending IF aws Kinesis throws exceptions because you are sending too fast. At this point Telegraf either needs to buffer, wait and retry or if its happening consistently the user needs to increase the shard count and make better use of partition keys if they are static.

@danielnelson
Copy link
Contributor

When you use random partition keys, you would be looking to make best use of the bandwidth available by creating multiple records spread across multiple PUT requests and then sent to multiple shards.

I see, with most of the partition strategies there is a finite set of batches and there is no reason to have them not be as large as possible. But with random we would like to have more batches, but still full PUT units.

It seems that for the best compression and the best performance it would be ideal to have the number of requests be no more than the number of shards, as more requests would hurt compression and less might hurt throughput.

What if we just ask the user to specify this for us? So you could use a random key with a factor of 5 and it will split the metrics evenly by count into 5 batches, assign each a random key, and then compress and send.

This removes all the trial and error and provides more flexibility based on your shard count and how many Telegraf produces you are running. For example, if you have 20 Telegraf producers and only 10 shards you would probably still want each to generate a single request. On the other hand if you have 1 Telegraf producer and 10 shards you would want Telegraf to produce 10 batches.

ErrBatchTooLarge in my mind is only required to slow down the sending IF aws Kinesis throws exceptions because you are sending too fast. At this point Telegraf either needs to buffer, wait and retry or if its happening consistently the user needs to increase the shard count and make better use of partition keys if they are static.

I've been meaning to make a change to how Telegraf behaves in the case of an error, right now it will retry as soon as a new batch is ready but IMO it should wait until the next flush_interval on any error.

@morfien101
Copy link
Contributor Author

I think we are getting to the point where we have an understanding.

What if we just ask the user to specify this for us? So you could use a random key with a factor of 5 and it will split the metrics evenly by count into 5 batches, assign each a random key, and then compress and send.

This idea is ok but adds in extra complexity. The code currently doesn't check to see if the record has maximum use of the space available to it. In fact it is a bit wasteful in order to save resources (time and cpu) on the senders side (Telegraf).
I'll explain.
When metrics come in they are line protocol and the only information that we have available is line count. We assume that with best efforts we can get all static partition keys into a single PUT request record body. So we go straight for serialisation. This serves 2 purposes, 1, after serialisation we have an uncompressed version of the body which we can now get a byte count out of. 2, with the byte count we can see if it will fit in a request body, len(body) is equal or less than max body. With the byte count we also know how many buckets we need, body size / max body, so make that many buckets and fill them all with lines and serialise.

If its random keys we check the count of lines and compare it against number of shards, without knowing the size we assume less than the shard count would be best as one payload, if more then we split it to get bandwidth utilization. So we make number of shards buckets fill it with lines then treat each bucket as if it was a static bucket. At the end we have a number of buckets each with a body that would fit compressed or not. We then save each bucket with its own randomly generated key, as if it was static.

We have not compressed yet. We make sure that they will fit uncompressed because we don't yet know if there will be compression. Also if it fits uncompressed, it will definitely fit compressed.
We waste a bit of space in the record but that's OK because we can make up for it by having multiple on a put request.

So we waste size in the records because we choose to process all the metrics BEFORE we can compress and know the actual size of the bodies.
However when it comes to building up the PUT request, we can have up to 500 records with a maximum of 5Mib of data, including the partition key.
We are more likely to blow the cap on the size before we blow 500 records. We measure the bodies as we allocate them to the PUT REQUEST to fit as much in as possible and do multiple if required.

Getting onto a fixed number. If we set the number in stone. Say 5 as per your example, we would need to split the data into 5 buckets, serialise and get a byte count. We can now check if the data is too large in its current state. If 1 of 5 of these buckets is too large we throw a ErrTooLarge or we could hold tight and check after compression.
Lets assume that we hold out in the new code, we move onto compression aka Encoding, we encode using what the user specified and then get a byte count. Do they all fit?
If they all fit, awesome, move on and send it. If not throw an ErrTooLarge, but whats the cost?
We have now sorted them, compressed all of them and because ErrTooLarge is generic, we don't know what could have been sent and what needs to be repackaged. So we take less and do it all again.

If we wanted to do a fixed number I would rather suggest that we say its best effort and closest to number we can get. We already need to connect to Kinesis and do a describe on the stream which returns a shard count. So we get that number for free. So rather than starting with shard count as the starting buckets, we start with what the user specified and work up from there. TBH, I like this idea of having a starting number as it would make good use of space available in the requests body. Its simple enough to implement but, I'll hold off until we agree to ship what we have maybe with this minor change.

This removes all the trial and error

I don't think it does, arguably waiting till you are ready to send, after sorting, serialisation and then compression, is the only time that you REALLY know that the batch was too large to fit in a single push. All the hard work for the code is done. So two option remain, either just accept that you may need to do more than a single PUT REQUEST on each interval, or drop everything you have currently, return an error, cut it in half and do it again hoping it will fit.

Ironically the BEST bandwidth would actually result in the worst performance. We could get maximum usage of put requests by compressing, checking size adding and repeat until we have a full 1Mib. But that would be insane on CPU and time ;)

@morfien101
Copy link
Contributor Author

@danielnelson Any update?

@danielnelson
Copy link
Contributor

I'd like to use a fixed configuration for the record creation, I'm firmly against the plugin doing multiple compression passes or dynamic record sizes. I can put together a pull request to implement this.

Partitioning must be done, of course, based on the partition key. For random key partitioning we can limit the number of keys to the number of records.

We can split the records at a reasonable number of metrics, I think 1000 is a good starting point as it should result in an average record of 500K without compression, with compression it will be much smaller but still large enough to fill several PUT Units. I'm open to making this number configurable as well.

The 5MB limit shouldn't come into play unless the batch size is set excessively high. In this case the user can simply reduce the metric batch size.

@danielnelson
Copy link
Contributor

Take a look at #6384, this is approximately how I think we should do it. The code is completely untested right now as I need to setup my AWS environment again for Kinesis development.

@morfien101
Copy link
Contributor Author

morfien101 commented Sep 30, 2019

TBH I have lost interest in this PR, its been dragging on for nearly 7 months now. I have created a external plugin that makes use of the Output Exec plugin. If anyone else wants to use it: https://github.com/morfien101/telegraf-output-kinesis

Feel free to close this or just put in the code that you wanted.

@sjwang90 sjwang90 modified the milestones: 1.13.0, 1.14.0 Nov 15, 2019
@sjwang90 sjwang90 removed this from the 1.14.0 milestone Feb 21, 2020
@sjwang90
Copy link
Contributor

@morfien101 I'm going to add this Kinesis output to our list of external plugin to be used with execd. Let me know if you have any problems with this.

If you wanted to update any of the repo's README information or add the Execd Go shim check out some of our external plugin docs.

@morfien101
Copy link
Contributor Author

@sjwang90 Sure, I'd be happy with that. I'll also have a look at the docs and make any adjustments.

@tehlers320
Copy link

TBH I have lost interest in this PR, its been dragging on for nearly 7 months now. I have created a external plugin that makes use of the Output Exec plugin. If anyone else wants to use it: https://github.com/morfien101/telegraf-output-kinesis

Feel free to close this or just put in the code that you wanted.

Yes this process took a bit too long, i ended up building my own binary off of this and using it. Now i am less likely to upgrade versions as time goes forward.

@sjwang90
Copy link
Contributor

Added to external plugins list

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/aws AWS plugins including cloudwatch, ecs, kinesis feat Improvement on an existing feature such as adding a new setting/mode to an existing plugin
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants