Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: refactor azureeventhubrehydrationreceiver to stream blobs as to not lock up on larger environments (BPOP-831) #2098

Merged
merged 21 commits into from
Jan 17, 2025

Conversation

schmikei
Copy link
Contributor

@schmikei schmikei commented Jan 7, 2025

Proposed Change

  • Streaming results of the blobs rather than doing a lump sum for the requests of the blobs. This would cause the collector to stall for large rehydration efforts using azure eventhub.
Checklist
  • Changes are tested
  • CI has passed

@schmikei schmikei changed the title pre tests working; refactor to stream blobs as to not lock up fix: refactor azureeventhubrehydrationreceiver to stream blobs as to not lock up on larger environments (BPOP-831) Jan 7, 2025
@schmikei schmikei marked this pull request as ready for review January 9, 2025 21:41
@schmikei schmikei requested review from dpaasman00 and a team as code owners January 9, 2025 21:41
@schmikei
Copy link
Contributor Author

schmikei commented Jan 9, 2025

Opening up for review; but going to see if I can add some more tests/benchmarks as I go before merging anything

Copy link
Contributor

@dpaasman00 dpaasman00 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some nits and a context thing

receiver/azureblobrehydrationreceiver/README.md Outdated Show resolved Hide resolved
receiver/azureblobrehydrationreceiver/config.go Outdated Show resolved Hide resolved
receiver/azureblobrehydrationreceiver/receiver.go Outdated Show resolved Hide resolved
receiver/azureblobrehydrationreceiver/receiver.go Outdated Show resolved Hide resolved
receiver/azureblobrehydrationreceiver/config.go Outdated Show resolved Hide resolved
@jsirianni jsirianni self-assigned this Jan 14, 2025
Copy link
Member

@jsirianni jsirianni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Working great. I did run into upgrade issues, left a comment around that.

Comment on lines 71 to 77
if c.PollInterval != 0 {
return errors.New("poll_interval is no longer supported and batch_size/page_size should be used instead")
}

if c.PollTimeout != 0 {
return errors.New("poll_timeout is no longer supported and batch_size/page_size should be used instead")
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change. Can we log warnings and ignore these options instead of failing?

{
  "level":"fatal",
  "ts":"2025-01-14T10:50:55.721-0500",
  "caller":"collector/main.go:121",
  "msg":"RunService returned error",
  "error":"failed to start service: error during OpAmp connection: collector failed to start: invalid configuration: receivers::azureblobrehydration/source1_01JHJQHCEX5NHT9HN2RS9FCRHX: poll_interval is no longer supported and batch_size/page_size should be used instead"}

I get that error when using Bindplane's Azure Blob source.

receivers:
    azureblobrehydration/source1_01JHJQHCEX5NHT9HN2RS9FCRHX:
        connection_string: redacted
        container: test
        delete_on_read: false
        ending_time: 2025-01-15T00:00
        poll_interval: 1m
        starting_time: 2025-01-01T00:00

This will allow us some time to get the Bindplane source updated, and provide users with upgrade compatibility. The source docs could include some disclaimers around agent version and support config options.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep I can look into that, was something originally brought up here which I discussed with @dpaasman00 #2098 (comment)

@dpaasman00 I think I'm leaning towards what Joe is suggesting here and just logging a warning within the start for a couple releases in order to maintain backward compatibility for a couple releases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea that works for me!

Copy link
Member

@jsirianni jsirianni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

edit: Debug exporter samples its output, which explains my findings :)

Still working great, however, I am having some performance issues.

Without a batch processor, I seem to get five logs per request. I get a burst of requests and then a ~7 second delay before the next "burst"

{"level":"info","ts":"2025-01-14T16:01:09.776-0500","msg":"Logs","kind":"exporter","data_type":"logs","name":"debug/debug","resource logs":5,"log records":5}
{"level":"info","ts":"2025-01-14T16:01:10.919-0500","msg":"Logs","kind":"exporter","data_type":"logs","name":"debug/debug","resource logs":5,"log records":5}
{"level":"info","ts":"2025-01-14T16:01:10.999-0500","msg":"Logs","kind":"exporter","data_type":"logs","name":"debug/debug","resource logs":5,"log records":5}
{"level":"info","ts":"2025-01-14T16:01:11.080-0500","msg":"Logs","kind":"exporter","data_type":"logs","name":"debug/debug","resource logs":5,"log records":5}
{"level":"info","ts":"2025-01-14T16:01:11.161-0500","msg":"Logs","kind":"exporter","data_type":"logs","name":"debug/debug","resource logs":5,"log records":5}
{"level":"info","ts":"2025-01-14T16:01:11.241-0500","msg":"Logs","kind":"exporter","data_type":"logs","name":"debug/debug","resource logs":5,"log records":5}
{"level":"info","ts":"2025-01-14T16:01:11.322-0500","msg":"Logs","kind":"exporter","data_type":"logs","name":"debug/debug","resource logs":5,"log records":5}
{"level":"info","ts":"2025-01-14T16:01:11.402-0500","msg":"Logs","kind":"exporter","data_type":"logs","name":"debug/debug","resource logs":5,"log records":5}
{"level":"info","ts":"2025-01-14T16:01:11.484-0500","msg":"Logs","kind":"exporter","data_type":"logs","name":"debug/debug","resource logs":5,"log records":5}
{"level":"info","ts":"2025-01-14T16:01:11.563-0500","msg":"Logs","kind":"exporter","data_type":"logs","name":"debug/debug","resource logs":5,"log records":5}
{"level":"info","ts":"2025-01-14T16:01:11.644-0500","msg":"Logs","kind":"exporter","data_type":"logs","name":"debug/debug","resource logs":5,"log records":5}

DELAY

{"level":"info","ts":"2025-01-14T16:01:19.787-0500","msg":"Logs","kind":"exporter","data_type":"logs","name":"debug/debug","resource logs":5,"log records":5}
{"level":"info","ts":"2025-01-14T16:01:20.934-0500","msg":"Logs","kind":"exporter","data_type":"logs","name":"debug/debug","resource logs":5,"log records":5}
{"level":"info","ts":"2025-01-14T16:01:21.015-0500","msg":"Logs","kind":"exporter","data_type":"logs","name":"debug/debug","resource logs":5,"log records":5}
{"level":"info","ts":"2025-01-14T16:01:21.097-0500","msg":"Logs","kind":"exporter","data_type":"logs","name":"debug/debug","resource logs":5,"log records":5}
{"level":"info","ts":"2025-01-14T16:01:21.179-0500","msg":"Logs","kind":"exporter","data_type":"logs","name":"debug/debug","resource logs":5,"log records":5}
{"level":"info","ts":"2025-01-14T16:01:21.265-0500","msg":"Logs","kind":"exporter","data_type":"logs","name":"debug/debug","resource logs":5,"log records":5}
{"level":"info","ts":"2025-01-14T16:01:21.351-0500","msg":"Logs","kind":"exporter","data_type":"logs","name":"debug/debug","resource logs":5,"log records":5}
{"level":"info","ts":"2025-01-14T16:01:21.432-0500","msg":"Logs","kind":"exporter","data_type":"logs","name":"debug/debug","resource logs":5,"log records":5}
{"level":"info","ts":"2025-01-14T16:01:21.512-0500","msg":"Logs","kind":"exporter","data_type":"logs","name":"debug/debug","resource logs":5,"log records":5}
{"level":"info","ts":"2025-01-14T16:01:21.592-0500","msg":"Logs","kind":"exporter","data_type":"logs","name":"debug/debug","resource logs":5,"log records":5}
{"level":"info","ts":"2025-01-14T16:01:21.675-0500","msg":"Logs","kind":"exporter","data_type":"logs","name":"debug/debug","resource logs":5,"log records":5}

When I use a batch processor with a 2 second send interval, the logs seem to be more steady.

{"level":"info","ts":"2025-01-14T16:01:40.796-0500","msg":"Logs","kind":"exporter","data_type":"logs","name":"debug/debug","resource logs":60,"log records":60}
{"level":"info","ts":"2025-01-14T16:01:42.798-0500","msg":"Logs","kind":"exporter","data_type":"logs","name":"debug/debug","resource logs":105,"log records":105}
{"level":"info","ts":"2025-01-14T16:01:44.799-0500","msg":"Logs","kind":"exporter","data_type":"logs","name":"debug/debug","resource logs":105,"log records":105}
{"level":"info","ts":"2025-01-14T16:01:46.800-0500","msg":"Logs","kind":"exporter","data_type":"logs","name":"debug/debug","resource logs":120,"log records":120}
{"level":"info","ts":"2025-01-14T16:01:48.801-0500","msg":"Logs","kind":"exporter","data_type":"logs","name":"debug/debug","resource logs":125,"log records":125}
{"level":"info","ts":"2025-01-14T16:01:50.803-0500","msg":"Logs","kind":"exporter","data_type":"logs","name":"debug/debug","resource logs":100,"log records":100}
{"level":"info","ts":"2025-01-14T16:01:52.805-0500","msg":"Logs","kind":"exporter","data_type":"logs","name":"debug/debug","resource logs":125,"log records":125}
{"level":"info","ts":"2025-01-14T16:01:54.806-0500","msg":"Logs","kind":"exporter","data_type":"logs","name":"debug/debug","resource logs":115,"log records":115}

Copy link
Member

@jsirianni jsirianni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General question: With the concurrency changes, we should make sure the shutdown logic allows the receiver to process all in flight batches before shutting down or persisting the marker to storage.

It may already be doing this, just want to be sure.

checkpointStore: rehydration.NewNopStorage(),
startingTime: startingTime,
endingTime: endingTime,
ctx: ctx,
cancelFunc: cancel,
blobChan: make(chan *azureblob.BlobResults),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should consider making this a buffered channel. I think it would be okay for the receiver to slow to a crawl if the channel is full and we are waiting on blobs to be processed and sent down the pipeline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea 👍

Comment on lines 221 to 222
r.logger.Warn("No blobs processed for 3 consecutive polls, assuming no more blobs to process")
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still curious about this. It feels awkward to me to have the receiver stop processing logs despite continuing to have the process run.

I think it would be nice to progressively backoff. Starting with some short interval in the seconds, maxing out in the minutes. If someone leaves the agent running too long, and it is backing off to retrying every 5m or 10m, I would not expect their Azure bill to be impacted much.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would agree generally that maybe a backoff approach could be taken, I was mostly just trying to keep the old functionality of the receiver stopping requests if nothing is being processed as a cost saving measure. I think Corbin was focusing on that for this use case rather than thinking a backoff approach should be taken...

@dpaasman00 do you have any thoughts on this as you've looked at s3 rehydration?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, Im okay with keeping it the same but would like to hear Ryan's take on it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could certainly be a future change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this functionality was removed, but I think that's okay since we're now using a better pagination process. We shouldn't need to worry about getting empty polls anymore.

@schmikei
Copy link
Contributor Author

General question: With the concurrency changes, we should make sure the shutdown logic allows the receiver to process all in flight batches before shutting down or persisting the marker to storage.

It may already be doing this, just want to be sure.

It should be doing this because of the wait group but I'll write a test to make sure!

Comment on lines +104 to +106
// factor of buffered channel size
// number of blobs to process at a time is blobChanSize * batchSize
const blobChanSize = 5
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, this seems like a good size, its performing well for me. In the future maybe we can expose it. I don't see a need to right now.

Copy link
Member

@jsirianni jsirianni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is working well for me. Id like @dpaasman00 to approve as well, he knows more of the collector inner workings.

@schmikei
Copy link
Contributor Author

@dpaasman00 was noticing some shutdown hanging but with my most recent commit I think this is ready for another look if you have a moment!

Copy link
Contributor

@dpaasman00 dpaasman00 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments, nothing major

Copy link
Contributor

@dpaasman00 dpaasman00 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few more comments about nits, but this looks good. Merge when you're happy with it

2. The receiver will parse each blob's path to determine if it matches a path created by the [Azure Blob Exporter](../../exporter/azureblobexporter/README.md#blob-path).
3. If the blob path is from the exporter, the receiver will parse the timestamp represented by the path.
4. If the timestamp is within the configured range the receiver will download the blob and parse its contents into OTLP data.

a. The receiver will process both uncompressed JSON blobs and blobs compressed with gzip.

> Note: There is no current way of specifying a time range to rehydrate so any blobs outside fo the time range still need to be retrieved from the API in order to filter via the `starting_time` and `ending_time` configuration.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
> Note: There is no current way of specifying a time range to rehydrate so any blobs outside fo the time range still need to be retrieved from the API in order to filter via the `starting_time` and `ending_time` configuration.
> Note: There is no current way of specifying a time range to rehydrate so any blobs outside of the time range still need to be retrieved from the API in order to filter via the `starting_time` and `ending_time` configuration.

var marker *string
var errs error
if err := r.makeCheckpoint(shutdownCtx); err != nil {
r.logger.Error("Error while saving checkpoint", zap.Error(err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this log can get removed since we're in shutdown and the error will get reported. Pretty nit tho so up to you

// Go through each blob and parse it's path to determine if we should consume it or not
r.logger.Debug("received a batch of blobs, parsing through them to determine if they should be rehydrated", zap.Int("num_blobs", len(blobs)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
r.logger.Debug("received a batch of blobs, parsing through them to determine if they should be rehydrated", zap.Int("num_blobs", len(blobs)))
r.logger.Debug("Received a batch of blobs, parsing through them to determine if they should be rehydrated", zap.Int("num_blobs", len(blobs)))

@schmikei schmikei merged commit 9a225bf into release/v1.69.0 Jan 17, 2025
15 checks passed
@schmikei schmikei deleted the feat/paginate-azure-eventhub-rehydration branch January 17, 2025 15:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants