From 3b6cebe8296eba470c61b0ffa134a1ede4b389eb Mon Sep 17 00:00:00 2001 From: schmikei Date: Tue, 7 Jan 2025 16:46:17 -0500 Subject: [PATCH 01/17] pre tests working; refactor to stream blobs as to not lock up --- .../azureblobrehydrationreceiver/config.go | 14 ++ .../azureblobrehydrationreceiver/factory.go | 2 + .../internal/azureblob/blob_client.go | 98 ++++++++++-- .../azureblob/mocks/mock_blob_client.go | 41 ++++- .../azureblobrehydrationreceiver/receiver.go | 145 +++++++----------- .../receiver_test.go | 6 +- 6 files changed, 207 insertions(+), 99 deletions(-) diff --git a/receiver/azureblobrehydrationreceiver/config.go b/receiver/azureblobrehydrationreceiver/config.go index 1d69e5a57..08427a276 100644 --- a/receiver/azureblobrehydrationreceiver/config.go +++ b/receiver/azureblobrehydrationreceiver/config.go @@ -25,6 +25,9 @@ import ( // Config is the configuration for the azure blob rehydration receiver type Config struct { + // BatchSize is the number of blobs to process entering the pipeline in a single batch. (default 1000) + BatchSize int `mapstructure:"batch_size"` + // ConnectionString is the Azure Blob Storage connection key, // which can be found in the Azure Blob Storage resource on the Azure Portal. (no default) ConnectionString string `mapstructure:"connection_string"` @@ -45,6 +48,9 @@ type Config struct { // Default value of false DeleteOnRead bool `mapstructure:"delete_on_read"` + // PageSize is the number of blobs to request from the Azure API at a time. (default 1000) + PageSize int `mapstructure:"page_size"` + // PollInterval is the interval at which the Azure API is scanned for blobs. // Default value of 1m PollInterval time.Duration `mapstructure:"poll_interval"` @@ -58,6 +64,10 @@ type Config struct { // Validate validates the config func (c *Config) Validate() error { + if c.BatchSize < 1 { + return errors.New("batch_size must be greater than 0") + } + if c.ConnectionString == "" { return errors.New("connection_string is required") } @@ -81,6 +91,10 @@ func (c *Config) Validate() error { return errors.New("ending_time must be at least one minute after starting_time") } + if c.PageSize < 1 { + return errors.New("page_size must be greater than 0") + } + if c.PollInterval < time.Second { return errors.New("poll_interval must be at least one second") } diff --git a/receiver/azureblobrehydrationreceiver/factory.go b/receiver/azureblobrehydrationreceiver/factory.go index 6465eb47f..f887178c0 100644 --- a/receiver/azureblobrehydrationreceiver/factory.go +++ b/receiver/azureblobrehydrationreceiver/factory.go @@ -45,6 +45,8 @@ func createDefaultConfig() component.Config { DeleteOnRead: false, PollInterval: time.Minute, PollTimeout: time.Second * 30, + BatchSize: 100, + PageSize: 1000, } } diff --git a/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go b/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go index dd5373c75..222e66c1c 100644 --- a/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go +++ b/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go @@ -41,33 +41,39 @@ type BlobClient interface { // DeleteBlob deletes the blob in the specified container DeleteBlob(ctx context.Context, container, blobPath string) error + + // StreamBlobs will stream blobs to the blobChan and errors to the errChan, generally if an errChan gets an item + // then the stream should be stopped + StreamBlobs(ctx context.Context, container string, prefix, marker *string, errChan chan error, blobChan chan *BlobResults, doneChan chan struct{}) } // AzureClient is an implementation of the BlobClient for Azure type AzureClient struct { - azClient *azblob.Client + azClient *azblob.Client + batchSize int + pageSize int32 } // NewAzureBlobClient creates a new azureBlobClient with the given connection string -func NewAzureBlobClient(connectionString string) (BlobClient, error) { +func NewAzureBlobClient(connectionString string, batchSize, pageSize int) (BlobClient, error) { azClient, err := azblob.NewClientFromConnectionString(connectionString, nil) if err != nil { return nil, err } return &AzureClient{ - azClient: azClient, + azClient: azClient, + batchSize: batchSize, + pageSize: int32(pageSize), }, nil } -// contentLengthKey key for the content length metadata -const contentLengthKey = "ContentLength" - // ListBlobs returns a list of blobInfo objects present in the container with the given prefix func (a *AzureClient) ListBlobs(ctx context.Context, container string, prefix, marker *string) ([]*BlobInfo, *string, error) { listOptions := &azblob.ListBlobsFlatOptions{ - Marker: marker, - Prefix: prefix, + Marker: marker, + Prefix: prefix, + MaxResults: &a.pageSize, } pager := a.azClient.NewListBlobsFlatPager(container, listOptions) @@ -94,7 +100,6 @@ func (a *AzureClient) ListBlobs(ctx context.Context, container string, prefix, m Name: *blob.Name, Size: *blob.Properties.ContentLength, } - blobs = append(blobs, info) } nextMarker = resp.NextMarker @@ -103,6 +108,81 @@ func (a *AzureClient) ListBlobs(ctx context.Context, container string, prefix, m return blobs, nextMarker, nil } +const emptyPollLimit = 3 + +// BlobResults contains the blobs for the receiver to process and the last marker +type BlobResults struct { + Blobs []*BlobInfo + LastMarker *string +} + +// StreamBlobs will stream blobs to the blobChan and errors to the errChan, generally if an errChan gets an item +// then the stream should be stopped +func (a *AzureClient) StreamBlobs(ctx context.Context, container string, prefix, beginningMarker *string, errChan chan error, blobChan chan *BlobResults, doneChan chan struct{}) { + pager := a.azClient.NewListBlobsFlatPager(container, &azblob.ListBlobsFlatOptions{ + Marker: beginningMarker, + Prefix: prefix, + MaxResults: &a.pageSize, + }) + + emptyPollCount := 0 + for pager.More() { + select { + case <-ctx.Done(): + return + default: + // If we had empty polls for the last 3 times, then we can assume that there are no more blobs to process + // and we can close the stream to avoid charging for the requests + if emptyPollCount == emptyPollLimit { + close(doneChan) + return + } + + resp, err := pager.NextPage(ctx) + if err != nil { + errChan <- fmt.Errorf("error streaming blobs: %w", err) + return + } + + batch := []*BlobInfo{} + for _, blob := range resp.Segment.BlobItems { + if blob.Deleted != nil && *blob.Deleted { + continue + } + if blob.Name == nil || blob.Properties == nil || blob.Properties.ContentLength == nil { + continue + } + + info := &BlobInfo{ + Name: *blob.Name, + Size: *blob.Properties.ContentLength, + } + batch = append(batch, info) + if len(batch) == int(a.batchSize) { + blobChan <- &BlobResults{ + Blobs: batch, + LastMarker: resp.NextMarker, + } + batch = []*BlobInfo{} + } + } + + if len(batch) == 0 { + emptyPollCount++ + continue + } + + emptyPollCount = 0 + blobChan <- &BlobResults{ + Blobs: batch, + LastMarker: resp.NextMarker, + } + } + } + + close(doneChan) +} + // DownloadBlob downloads the contents of the blob into the supplied buffer. // It will return the count of bytes used in the buffer. func (a *AzureClient) DownloadBlob(ctx context.Context, container, blobPath string, buf []byte) (int64, error) { diff --git a/receiver/azureblobrehydrationreceiver/internal/azureblob/mocks/mock_blob_client.go b/receiver/azureblobrehydrationreceiver/internal/azureblob/mocks/mock_blob_client.go index 8e24aa1c5..32dbce7bc 100644 --- a/receiver/azureblobrehydrationreceiver/internal/azureblob/mocks/mock_blob_client.go +++ b/receiver/azureblobrehydrationreceiver/internal/azureblob/mocks/mock_blob_client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.50.0. DO NOT EDIT. +// Code generated by mockery v2.49.0. DO NOT EDIT. package mocks @@ -200,6 +200,45 @@ func (_c *MockBlobClient_ListBlobs_Call) RunAndReturn(run func(context.Context, return _c } +// StreamBlobs provides a mock function with given fields: ctx, container, prefix, marker, errChan, blobChan, doneChan +func (_m *MockBlobClient) StreamBlobs(ctx context.Context, container string, prefix *string, marker *string, errChan chan error, blobChan chan *azureblob.BlobResults, doneChan chan struct{}) { + _m.Called(ctx, container, prefix, marker, errChan, blobChan, doneChan) +} + +// MockBlobClient_StreamBlobs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StreamBlobs' +type MockBlobClient_StreamBlobs_Call struct { + *mock.Call +} + +// StreamBlobs is a helper method to define mock.On call +// - ctx context.Context +// - container string +// - prefix *string +// - marker *string +// - errChan chan error +// - blobChan chan *azureblob.BlobResults +// - doneChan chan struct{} +func (_e *MockBlobClient_Expecter) StreamBlobs(ctx interface{}, container interface{}, prefix interface{}, marker interface{}, errChan interface{}, blobChan interface{}, doneChan interface{}) *MockBlobClient_StreamBlobs_Call { + return &MockBlobClient_StreamBlobs_Call{Call: _e.mock.On("StreamBlobs", ctx, container, prefix, marker, errChan, blobChan, doneChan)} +} + +func (_c *MockBlobClient_StreamBlobs_Call) Run(run func(ctx context.Context, container string, prefix *string, marker *string, errChan chan error, blobChan chan *azureblob.BlobResults, doneChan chan struct{})) *MockBlobClient_StreamBlobs_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(*string), args[3].(*string), args[4].(chan error), args[5].(chan *azureblob.BlobResults), args[6].(chan struct{})) + }) + return _c +} + +func (_c *MockBlobClient_StreamBlobs_Call) Return() *MockBlobClient_StreamBlobs_Call { + _c.Call.Return() + return _c +} + +func (_c *MockBlobClient_StreamBlobs_Call) RunAndReturn(run func(context.Context, string, *string, *string, chan error, chan *azureblob.BlobResults, chan struct{})) *MockBlobClient_StreamBlobs_Call { + _c.Call.Return(run) + return _c +} + // NewMockBlobClient creates a new instance of MockBlobClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockBlobClient(t interface { diff --git a/receiver/azureblobrehydrationreceiver/receiver.go b/receiver/azureblobrehydrationreceiver/receiver.go index f55eefb09..91982483d 100644 --- a/receiver/azureblobrehydrationreceiver/receiver.go +++ b/receiver/azureblobrehydrationreceiver/receiver.go @@ -40,15 +40,17 @@ type rehydrationReceiver struct { azureClient azureblob.BlobClient supportedTelemetry pipeline.Signal consumer rehydration.Consumer + checkpoint *rehydration.CheckPoint checkpointStore rehydration.CheckpointStorer + lastBlob *azureblob.BlobInfo + lastBlobTime *time.Time + startingTime time.Time endingTime time.Time - doneChan chan struct{} started bool - ctx context.Context - cancelFunc context.CancelCauseFunc + cancelFunc context.CancelFunc } // newMetricsReceiver creates a new metrics specific receiver. @@ -92,7 +94,7 @@ func newTracesReceiver(id component.ID, logger *zap.Logger, cfg *Config, nextCon // newRehydrationReceiver creates a new rehydration receiver func newRehydrationReceiver(id component.ID, logger *zap.Logger, cfg *Config) (*rehydrationReceiver, error) { - azureClient, err := newAzureBlobClient(cfg.ConnectionString) + azureClient, err := newAzureBlobClient(cfg.ConnectionString, cfg.BatchSize, cfg.PageSize) if err != nil { return nil, fmt.Errorf("new Azure client: %w", err) } @@ -109,55 +111,41 @@ func newRehydrationReceiver(id component.ID, logger *zap.Logger, cfg *Config) (* return nil, fmt.Errorf("invalid ending_time timestamp: %w", err) } - ctx, cancel := context.WithCancelCause(context.Background()) - return &rehydrationReceiver{ logger: logger, id: id, cfg: cfg, azureClient: azureClient, - doneChan: make(chan struct{}), checkpointStore: rehydration.NewNopStorage(), startingTime: startingTime, endingTime: endingTime, - ctx: ctx, - cancelFunc: cancel, }, nil } // Start starts the rehydration receiver func (r *rehydrationReceiver) Start(ctx context.Context, host component.Host) error { - if r.cfg.StorageID != nil { checkpointStore, err := rehydration.NewCheckpointStorage(ctx, host, *r.cfg.StorageID, r.id, r.supportedTelemetry) if err != nil { return fmt.Errorf("NewCheckpointStorage: %w", err) } - r.checkpointStore = checkpointStore } r.started = true - go r.scrape() + go r.streamRehydrateBlobs(ctx) return nil } // Shutdown shuts down the rehydration receiver func (r *rehydrationReceiver) Shutdown(ctx context.Context) error { - r.cancelFunc(errors.New("shutdown")) var err error - // If we have called started then close and wait for goroutine to finish - if r.started { - select { - case <-ctx.Done(): - err = ctx.Err() - case <-r.doneChan: - } + if r.cancelFunc != nil { + r.cancelFunc() } - + r.makeCheckpoint(ctx) err = errors.Join(err, r.checkpointStore.Close(ctx)) - return err } @@ -165,65 +153,53 @@ func (r *rehydrationReceiver) Shutdown(ctx context.Context) error { // occur before we stop polling. const emptyPollLimit = 3 -// scrape scrapes the Azure api on interval -func (r *rehydrationReceiver) scrape() { - defer close(r.doneChan) +func (r *rehydrationReceiver) streamRehydrateBlobs(ctx context.Context) { ticker := time.NewTicker(r.cfg.PollInterval) defer ticker.Stop() var marker *string - // load the previous checkpoint. If not exist should return zero value for time - checkpoint, err := r.checkpointStore.LoadCheckPoint(r.ctx, r.checkpointKey()) + checkpoint, err := r.checkpointStore.LoadCheckPoint(ctx, r.checkpointKey()) if err != nil { r.logger.Warn("Error loading checkpoint, continuing without a previous checkpoint", zap.Error(err)) checkpoint = rehydration.NewCheckpoint() } + r.checkpoint = checkpoint - // Call once before the loop to ensure we do a collection before the first ticker - numBlobsRehydrated := r.rehydrateBlobs(checkpoint, marker) - emptyBlobCounter := checkBlobCount(numBlobsRehydrated, 0) - - for { - select { - case <-r.ctx.Done(): - return - case <-ticker.C: - // Polling for blobs has egress charges so we want to stop polling - // after we stop finding blobs. - if emptyBlobCounter == emptyPollLimit { - return - } - - numBlobsRehydrated := r.rehydrateBlobs(checkpoint, marker) - emptyBlobCounter = checkBlobCount(numBlobsRehydrated, emptyBlobCounter) - } - } -} - -// rehydrateBlobs pulls blob paths from the UI and if they are within the specified -// time range then the blobs will be downloaded and rehydrated. -// The passed in checkpoint and marker will be updated and should be used in the next iteration. -// The count of blobs processed will be returned -func (r *rehydrationReceiver) rehydrateBlobs(checkpoint *rehydration.CheckPoint, marker *string) (numBlobsRehydrated int) { var prefix *string if r.cfg.RootFolder != "" { prefix = &r.cfg.RootFolder } - ctxTimeout, cancel := context.WithTimeout(r.ctx, r.cfg.PollTimeout) - defer cancel() + blobChan := make(chan *azureblob.BlobResults) + errChan := make(chan error) - // get blobs from Azure - blobs, nextMarker, err := r.azureClient.ListBlobs(ctxTimeout, r.cfg.Container, prefix, marker) - if err != nil { - r.logger.Error("Failed to list blobs", zap.Error(err)) - return + cancelCtx, cancel := context.WithCancel(ctx) + r.cancelFunc = cancel + doneChan := make(chan struct{}) + + go r.azureClient.StreamBlobs(cancelCtx, r.cfg.Container, prefix, marker, errChan, blobChan, doneChan) + + for { + select { + case <-ctx.Done(): + return + case <-doneChan: + return + case err := <-errChan: + r.logger.Error("Error streaming blobs", zap.Error(err)) + return + case br := <-blobChan: + r.logger.Debug("Received blobs from stream", zap.Int("number_of_blobs", len(br.Blobs))) + r.rehydrateBlobs(ctx, br.Blobs) + } } - marker = nextMarker +} +func (r *rehydrationReceiver) rehydrateBlobs(ctx context.Context, blobs []*azureblob.BlobInfo) { // Go through each blob and parse it's path to determine if we should consume it or not + numProcessedBlobs := 0 for _, blob := range blobs { blobTime, telemetryType, err := rehydration.ParseEntityPath(blob.Name) switch { @@ -231,48 +207,48 @@ func (r *rehydrationReceiver) rehydrateBlobs(checkpoint *rehydration.CheckPoint, r.logger.Debug("Skipping Blob, non-matching blob path", zap.String("blob", blob.Name)) case err != nil: r.logger.Error("Error processing blob path", zap.String("blob", blob.Name), zap.Error(err)) - case checkpoint.ShouldParse(*blobTime, blob.Name): + case r.checkpoint.ShouldParse(*blobTime, blob.Name): // if the blob is not in the specified time range or not of the telemetry type supported by this receiver // then skip consuming it. if !rehydration.IsInTimeRange(*blobTime, r.startingTime, r.endingTime) || telemetryType != r.supportedTelemetry { continue } + r.lastBlob = blob + r.lastBlobTime = blobTime + // Process and consume the blob at the given path - if err := r.processBlob(blob); err != nil { + if err := r.processBlob(ctx, blob); err != nil { r.logger.Error("Error consuming blob", zap.String("blob", blob.Name), zap.Error(err)) continue } - numBlobsRehydrated++ - - // Update and save the checkpoint with the most recently processed blob - checkpoint.UpdateCheckpoint(*blobTime, blob.Name) - if err := r.checkpointStore.SaveCheckpoint(r.ctx, r.checkpointKey(), checkpoint); err != nil { - r.logger.Error("Error while saving checkpoint", zap.Error(err)) - } + numProcessedBlobs++ + r.logger.Debug("Processed blob", zap.String("blob", blob.Name), zap.Int("num_processed_blobs", numProcessedBlobs)) // Delete blob if configured to do so if r.cfg.DeleteOnRead { - if err := r.azureClient.DeleteBlob(r.ctx, r.cfg.Container, blob.Name); err != nil { + if err := r.azureClient.DeleteBlob(ctx, r.cfg.Container, blob.Name); err != nil { r.logger.Error("Error while attempting to delete blob", zap.String("blob", blob.Name), zap.Error(err)) } } } } - return + if err := r.makeCheckpoint(ctx); err != nil { + r.logger.Error("Error while saving checkpoint", zap.Error(err)) + } } // processBlob does the following: // 1. Downloads the blob // 2. Decompresses the blob if applicable // 3. Pass the blob to the consumer -func (r *rehydrationReceiver) processBlob(blob *azureblob.BlobInfo) error { +func (r *rehydrationReceiver) processBlob(ctx context.Context, blob *azureblob.BlobInfo) error { // Allocate a buffer the size of the blob. If the buffer isn't big enough download errors. blobBuffer := make([]byte, blob.Size) - size, err := r.azureClient.DownloadBlob(r.ctx, r.cfg.Container, blob.Name, blobBuffer) + size, err := r.azureClient.DownloadBlob(ctx, r.cfg.Container, blob.Name, blobBuffer) if err != nil { return fmt.Errorf("download blob: %w", err) } @@ -291,7 +267,7 @@ func (r *rehydrationReceiver) processBlob(blob *azureblob.BlobInfo) error { return fmt.Errorf("unsupported file type: %s", ext) } - if err := r.consumer.Consume(r.ctx, blobBuffer); err != nil { + if err := r.consumer.Consume(ctx, blobBuffer); err != nil { return fmt.Errorf("consume: %w", err) } return nil @@ -305,16 +281,13 @@ func (r *rehydrationReceiver) checkpointKey() string { return fmt.Sprintf("%s_%s_%s", checkpointStorageKey, r.id, r.supportedTelemetry.String()) } -// checkBlobCount checks the number of blobs rehydrated and the current state of the -// empty counter. If zero blobs were rehydrated increment the counter. -// If there were blobs rehydrated reset the counter as we want to track consecutive zero sized polls. -func checkBlobCount(numBlobsRehydrated, emptyBlobsCounter int) int { - switch { - case emptyBlobsCounter == emptyPollLimit: // If we are at the limit return the limit - return emptyPollLimit - case numBlobsRehydrated == 0: // If no blobs were rehydrated then increment the empty blobs counter - return emptyBlobsCounter + 1 - default: // Default case is numBlobsRehydrated > 0 so reset emptyBlobsCounter to 0 - return 0 +func (r *rehydrationReceiver) makeCheckpoint(ctx context.Context) error { + if r.lastBlob == nil || r.lastBlobTime == nil { + return nil } + + r.logger.Debug("Making checkpoint", zap.String("blob", r.lastBlob.Name), zap.Time("time", *r.lastBlobTime)) + r.checkpoint.UpdateCheckpoint(*r.lastBlobTime, r.lastBlob.Name) + r.checkpointStore.SaveCheckpoint(ctx, r.checkpointKey(), r.checkpoint) + return nil } diff --git a/receiver/azureblobrehydrationreceiver/receiver_test.go b/receiver/azureblobrehydrationreceiver/receiver_test.go index 3144d6347..f26405797 100644 --- a/receiver/azureblobrehydrationreceiver/receiver_test.go +++ b/receiver/azureblobrehydrationreceiver/receiver_test.go @@ -496,10 +496,10 @@ func Test_processBlob(t *testing.T) { }, consumer: mockConsumer, azureClient: mockClient, - ctx: context.Background(), + // ctx: context.Background(), } - err := r.processBlob(tc.info) + err := r.processBlob(context.Background(), tc.info) if tc.expectedErr == nil { require.NoError(t, err) } else { @@ -517,7 +517,7 @@ func setNewAzureBlobClient(t *testing.T) *blobmocks.MockBlobClient { mockClient := blobmocks.NewMockBlobClient(t) - newAzureBlobClient = func(_ string) (azureblob.BlobClient, error) { + newAzureBlobClient = func(_ string, _ int, _ int) (azureblob.BlobClient, error) { return mockClient, nil } From 7cce249fe435a5045f87f0789408f018f137b0ac Mon Sep 17 00:00:00 2001 From: schmikei Date: Thu, 9 Jan 2025 14:25:17 -0500 Subject: [PATCH 02/17] fix tests --- .../config_test.go | 52 +++++++ .../factory_test.go | 2 + .../internal/azureblob/blob_client.go | 12 +- .../azureblob/mocks/mock_blob_client.go | 17 +-- .../azureblobrehydrationreceiver/receiver.go | 51 ++++--- .../receiver_test.go | 144 +++++++++--------- 6 files changed, 171 insertions(+), 107 deletions(-) diff --git a/receiver/azureblobrehydrationreceiver/config_test.go b/receiver/azureblobrehydrationreceiver/config_test.go index 8ddb34ef9..1cf647c34 100644 --- a/receiver/azureblobrehydrationreceiver/config_test.go +++ b/receiver/azureblobrehydrationreceiver/config_test.go @@ -39,6 +39,8 @@ func TestConfigValidate(t *testing.T) { DeleteOnRead: false, PollInterval: time.Second, PollTimeout: time.Second * 10, + BatchSize: 100, + PageSize: 1000, }, expectErr: errors.New("connection_string is required"), }, @@ -53,6 +55,8 @@ func TestConfigValidate(t *testing.T) { DeleteOnRead: false, PollInterval: time.Second, PollTimeout: time.Second * 10, + BatchSize: 100, + PageSize: 1000, }, expectErr: errors.New("container is required"), }, @@ -67,6 +71,8 @@ func TestConfigValidate(t *testing.T) { DeleteOnRead: false, PollInterval: time.Second, PollTimeout: time.Second * 10, + BatchSize: 100, + PageSize: 1000, }, expectErr: errors.New("starting_time is invalid: missing value"), }, @@ -81,6 +87,8 @@ func TestConfigValidate(t *testing.T) { DeleteOnRead: false, PollInterval: time.Second, PollTimeout: time.Second * 10, + BatchSize: 100, + PageSize: 1000, }, expectErr: errors.New("ending_time is invalid: missing value"), }, @@ -95,6 +103,8 @@ func TestConfigValidate(t *testing.T) { DeleteOnRead: false, PollInterval: time.Second, PollTimeout: time.Second * 10, + BatchSize: 100, + PageSize: 1000, }, expectErr: errors.New("starting_time is invalid: invalid timestamp"), }, @@ -109,6 +119,8 @@ func TestConfigValidate(t *testing.T) { DeleteOnRead: false, PollInterval: time.Second, PollTimeout: time.Second * 10, + BatchSize: 100, + PageSize: 1000, }, expectErr: errors.New("ending_time is invalid: invalid timestamp"), }, @@ -123,6 +135,8 @@ func TestConfigValidate(t *testing.T) { DeleteOnRead: false, PollInterval: time.Second, PollTimeout: time.Second * 10, + BatchSize: 100, + PageSize: 1000, }, expectErr: errors.New("ending_time must be at least one minute after starting_time"), }, @@ -137,6 +151,8 @@ func TestConfigValidate(t *testing.T) { DeleteOnRead: false, PollInterval: time.Millisecond, PollTimeout: time.Second * 10, + BatchSize: 100, + PageSize: 1000, }, expectErr: errors.New("poll_interval must be at least one second"), }, @@ -151,9 +167,43 @@ func TestConfigValidate(t *testing.T) { DeleteOnRead: false, PollInterval: time.Second * 2, PollTimeout: time.Millisecond, + BatchSize: 100, + PageSize: 1000, }, expectErr: errors.New("poll_timeout must be at least one second"), }, + { + desc: "Bad batch_size", + cfg: &Config{ + ConnectionString: "connection_string", + Container: "container", + RootFolder: "root", + StartingTime: "2023-10-02T17:00", + EndingTime: "2023-10-02T17:01", + DeleteOnRead: false, + PollInterval: time.Second, + PollTimeout: time.Second * 10, + BatchSize: 0, + PageSize: 1000, + }, + expectErr: errors.New("batch_size must be greater than 0"), + }, + { + desc: "Bad page_size", + cfg: &Config{ + ConnectionString: "connection_string", + Container: "container", + RootFolder: "root", + StartingTime: "2023-10-02T17:00", + EndingTime: "2023-10-02T17:01", + DeleteOnRead: false, + PollInterval: time.Second, + PollTimeout: time.Second * 10, + BatchSize: 100, + PageSize: 0, + }, + expectErr: errors.New("page_size must be greater than 0"), + }, { desc: "Valid config", cfg: &Config{ @@ -165,6 +215,8 @@ func TestConfigValidate(t *testing.T) { DeleteOnRead: false, PollInterval: time.Second, PollTimeout: time.Second * 10, + BatchSize: 100, + PageSize: 1000, }, expectErr: nil, }, diff --git a/receiver/azureblobrehydrationreceiver/factory_test.go b/receiver/azureblobrehydrationreceiver/factory_test.go index ce8e95ff0..3b23f7646 100644 --- a/receiver/azureblobrehydrationreceiver/factory_test.go +++ b/receiver/azureblobrehydrationreceiver/factory_test.go @@ -26,6 +26,8 @@ func Test_createDefaultConfig(t *testing.T) { DeleteOnRead: false, PollInterval: time.Minute, PollTimeout: time.Second * 30, + BatchSize: 100, + PageSize: 1000, } componentCfg := createDefaultConfig() diff --git a/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go b/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go index 222e66c1c..b77a77071 100644 --- a/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go +++ b/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go @@ -44,7 +44,7 @@ type BlobClient interface { // StreamBlobs will stream blobs to the blobChan and errors to the errChan, generally if an errChan gets an item // then the stream should be stopped - StreamBlobs(ctx context.Context, container string, prefix, marker *string, errChan chan error, blobChan chan *BlobResults, doneChan chan struct{}) + StreamBlobs(ctx context.Context, container string, prefix *string, errChan chan error, blobChan chan *BlobResults, doneChan chan struct{}) } // AzureClient is an implementation of the BlobClient for Azure @@ -118,9 +118,10 @@ type BlobResults struct { // StreamBlobs will stream blobs to the blobChan and errors to the errChan, generally if an errChan gets an item // then the stream should be stopped -func (a *AzureClient) StreamBlobs(ctx context.Context, container string, prefix, beginningMarker *string, errChan chan error, blobChan chan *BlobResults, doneChan chan struct{}) { +func (a *AzureClient) StreamBlobs(ctx context.Context, container string, prefix *string, errChan chan error, blobChan chan *BlobResults, doneChan chan struct{}) { + var marker *string pager := a.azClient.NewListBlobsFlatPager(container, &azblob.ListBlobsFlatOptions{ - Marker: beginningMarker, + Marker: marker, Prefix: prefix, MaxResults: &a.pageSize, }) @@ -161,7 +162,7 @@ func (a *AzureClient) StreamBlobs(ctx context.Context, container string, prefix, if len(batch) == int(a.batchSize) { blobChan <- &BlobResults{ Blobs: batch, - LastMarker: resp.NextMarker, + LastMarker: marker, } batch = []*BlobInfo{} } @@ -175,8 +176,9 @@ func (a *AzureClient) StreamBlobs(ctx context.Context, container string, prefix, emptyPollCount = 0 blobChan <- &BlobResults{ Blobs: batch, - LastMarker: resp.NextMarker, + LastMarker: marker, } + marker = resp.NextMarker } } diff --git a/receiver/azureblobrehydrationreceiver/internal/azureblob/mocks/mock_blob_client.go b/receiver/azureblobrehydrationreceiver/internal/azureblob/mocks/mock_blob_client.go index 32dbce7bc..621e9211b 100644 --- a/receiver/azureblobrehydrationreceiver/internal/azureblob/mocks/mock_blob_client.go +++ b/receiver/azureblobrehydrationreceiver/internal/azureblob/mocks/mock_blob_client.go @@ -200,9 +200,9 @@ func (_c *MockBlobClient_ListBlobs_Call) RunAndReturn(run func(context.Context, return _c } -// StreamBlobs provides a mock function with given fields: ctx, container, prefix, marker, errChan, blobChan, doneChan -func (_m *MockBlobClient) StreamBlobs(ctx context.Context, container string, prefix *string, marker *string, errChan chan error, blobChan chan *azureblob.BlobResults, doneChan chan struct{}) { - _m.Called(ctx, container, prefix, marker, errChan, blobChan, doneChan) +// StreamBlobs provides a mock function with given fields: ctx, container, prefix, errChan, blobChan, doneChan +func (_m *MockBlobClient) StreamBlobs(ctx context.Context, container string, prefix *string, errChan chan error, blobChan chan *azureblob.BlobResults, doneChan chan struct{}) { + _m.Called(ctx, container, prefix, errChan, blobChan, doneChan) } // MockBlobClient_StreamBlobs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StreamBlobs' @@ -214,17 +214,16 @@ type MockBlobClient_StreamBlobs_Call struct { // - ctx context.Context // - container string // - prefix *string -// - marker *string // - errChan chan error // - blobChan chan *azureblob.BlobResults // - doneChan chan struct{} -func (_e *MockBlobClient_Expecter) StreamBlobs(ctx interface{}, container interface{}, prefix interface{}, marker interface{}, errChan interface{}, blobChan interface{}, doneChan interface{}) *MockBlobClient_StreamBlobs_Call { - return &MockBlobClient_StreamBlobs_Call{Call: _e.mock.On("StreamBlobs", ctx, container, prefix, marker, errChan, blobChan, doneChan)} +func (_e *MockBlobClient_Expecter) StreamBlobs(ctx interface{}, container interface{}, prefix interface{}, errChan interface{}, blobChan interface{}, doneChan interface{}) *MockBlobClient_StreamBlobs_Call { + return &MockBlobClient_StreamBlobs_Call{Call: _e.mock.On("StreamBlobs", ctx, container, prefix, errChan, blobChan, doneChan)} } -func (_c *MockBlobClient_StreamBlobs_Call) Run(run func(ctx context.Context, container string, prefix *string, marker *string, errChan chan error, blobChan chan *azureblob.BlobResults, doneChan chan struct{})) *MockBlobClient_StreamBlobs_Call { +func (_c *MockBlobClient_StreamBlobs_Call) Run(run func(ctx context.Context, container string, prefix *string, errChan chan error, blobChan chan *azureblob.BlobResults, doneChan chan struct{})) *MockBlobClient_StreamBlobs_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(*string), args[3].(*string), args[4].(chan error), args[5].(chan *azureblob.BlobResults), args[6].(chan struct{})) + run(args[0].(context.Context), args[1].(string), args[2].(*string), args[3].(chan error), args[4].(chan *azureblob.BlobResults), args[5].(chan struct{})) }) return _c } @@ -234,7 +233,7 @@ func (_c *MockBlobClient_StreamBlobs_Call) Return() *MockBlobClient_StreamBlobs_ return _c } -func (_c *MockBlobClient_StreamBlobs_Call) RunAndReturn(run func(context.Context, string, *string, *string, chan error, chan *azureblob.BlobResults, chan struct{})) *MockBlobClient_StreamBlobs_Call { +func (_c *MockBlobClient_StreamBlobs_Call) RunAndReturn(run func(context.Context, string, *string, chan error, chan *azureblob.BlobResults, chan struct{})) *MockBlobClient_StreamBlobs_Call { _c.Call.Return(run) return _c } diff --git a/receiver/azureblobrehydrationreceiver/receiver.go b/receiver/azureblobrehydrationreceiver/receiver.go index 91982483d..60b88f730 100644 --- a/receiver/azureblobrehydrationreceiver/receiver.go +++ b/receiver/azureblobrehydrationreceiver/receiver.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "path/filepath" + "sync" "time" "github.com/observiq/bindplane-otel-collector/internal/rehydration" @@ -43,13 +44,18 @@ type rehydrationReceiver struct { checkpoint *rehydration.CheckPoint checkpointStore rehydration.CheckpointStorer + blobChan chan *azureblob.BlobResults + errChan chan error + doneChan chan struct{} + + mut *sync.Mutex + lastBlob *azureblob.BlobInfo lastBlobTime *time.Time startingTime time.Time endingTime time.Time - started bool cancelFunc context.CancelFunc } @@ -119,6 +125,10 @@ func newRehydrationReceiver(id component.ID, logger *zap.Logger, cfg *Config) (* checkpointStore: rehydration.NewNopStorage(), startingTime: startingTime, endingTime: endingTime, + blobChan: make(chan *azureblob.BlobResults), + errChan: make(chan error), + doneChan: make(chan struct{}), + mut: &sync.Mutex{}, }, nil } @@ -131,8 +141,6 @@ func (r *rehydrationReceiver) Start(ctx context.Context, host component.Host) er } r.checkpointStore = checkpointStore } - - r.started = true go r.streamRehydrateBlobs(ctx) return nil } @@ -144,21 +152,15 @@ func (r *rehydrationReceiver) Shutdown(ctx context.Context) error { if r.cancelFunc != nil { r.cancelFunc() } - r.makeCheckpoint(ctx) + if err := r.makeCheckpoint(ctx); err != nil { + r.logger.Error("Error while saving checkpoint", zap.Error(err)) + err = errors.Join(err, err) + } err = errors.Join(err, r.checkpointStore.Close(ctx)) return err } -// emptyPollLimit is the number of consecutive empty polling cycles that can -// occur before we stop polling. -const emptyPollLimit = 3 - func (r *rehydrationReceiver) streamRehydrateBlobs(ctx context.Context) { - ticker := time.NewTicker(r.cfg.PollInterval) - defer ticker.Stop() - - var marker *string - checkpoint, err := r.checkpointStore.LoadCheckPoint(ctx, r.checkpointKey()) if err != nil { r.logger.Warn("Error loading checkpoint, continuing without a previous checkpoint", zap.Error(err)) @@ -171,30 +173,28 @@ func (r *rehydrationReceiver) streamRehydrateBlobs(ctx context.Context) { prefix = &r.cfg.RootFolder } - blobChan := make(chan *azureblob.BlobResults) - errChan := make(chan error) - cancelCtx, cancel := context.WithCancel(ctx) r.cancelFunc = cancel - doneChan := make(chan struct{}) - go r.azureClient.StreamBlobs(cancelCtx, r.cfg.Container, prefix, marker, errChan, blobChan, doneChan) + startTime := time.Now() + r.logger.Info("Starting rehydration", zap.Time("startTime", startTime)) + + go r.azureClient.StreamBlobs(cancelCtx, r.cfg.Container, prefix, r.errChan, r.blobChan, r.doneChan) for { select { case <-ctx.Done(): return - case <-doneChan: + case <-r.doneChan: + r.logger.Info("Finished rehydrating blobs", zap.Int("durationSeconds", int(time.Since(startTime).Seconds()))) return - case err := <-errChan: - r.logger.Error("Error streaming blobs", zap.Error(err)) + case err := <-r.errChan: + r.logger.Error("Error streaming blobs, stopping rehydration", zap.Error(err), zap.Int("durationSeconds", int(time.Since(startTime).Seconds()))) return - case br := <-blobChan: - r.logger.Debug("Received blobs from stream", zap.Int("number_of_blobs", len(br.Blobs))) + case br := <-r.blobChan: r.rehydrateBlobs(ctx, br.Blobs) } } - } func (r *rehydrationReceiver) rehydrateBlobs(ctx context.Context, blobs []*azureblob.BlobInfo) { @@ -285,8 +285,9 @@ func (r *rehydrationReceiver) makeCheckpoint(ctx context.Context) error { if r.lastBlob == nil || r.lastBlobTime == nil { return nil } - r.logger.Debug("Making checkpoint", zap.String("blob", r.lastBlob.Name), zap.Time("time", *r.lastBlobTime)) + r.mut.Lock() + defer r.mut.Unlock() r.checkpoint.UpdateCheckpoint(*r.lastBlobTime, r.lastBlob.Name) r.checkpointStore.SaveCheckpoint(ctx, r.checkpointKey(), r.checkpoint) return nil diff --git a/receiver/azureblobrehydrationreceiver/receiver_test.go b/receiver/azureblobrehydrationreceiver/receiver_test.go index f26405797..053fd8c81 100644 --- a/receiver/azureblobrehydrationreceiver/receiver_test.go +++ b/receiver/azureblobrehydrationreceiver/receiver_test.go @@ -19,14 +19,9 @@ import ( "compress/gzip" "context" "errors" - "sync/atomic" "testing" "time" - "github.com/observiq/bindplane-otel-collector/internal/rehydration" - "github.com/observiq/bindplane-otel-collector/internal/testutils" - "github.com/observiq/bindplane-otel-collector/receiver/azureblobrehydrationreceiver/internal/azureblob" - blobmocks "github.com/observiq/bindplane-otel-collector/receiver/azureblobrehydrationreceiver/internal/azureblob/mocks" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" @@ -34,6 +29,11 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pipeline" "go.uber.org/zap" + + "github.com/observiq/bindplane-otel-collector/internal/rehydration" + "github.com/observiq/bindplane-otel-collector/internal/testutils" + "github.com/observiq/bindplane-otel-collector/receiver/azureblobrehydrationreceiver/internal/azureblob" + blobmocks "github.com/observiq/bindplane-otel-collector/receiver/azureblobrehydrationreceiver/internal/azureblob/mocks" ) func Test_newMetricsReceiver(t *testing.T) { @@ -116,27 +116,6 @@ func Test_fullRehydration(t *testing.T) { DeleteOnRead: false, } - t.Run("empty blob polling", func(t *testing.T) { - var listCounter atomic.Int32 - - // Setup mocks - mockClient := setNewAzureBlobClient(t) - mockClient.EXPECT().ListBlobs(mock.Anything, cfg.Container, (*string)(nil), (*string)(nil)).Times(3).Return([]*azureblob.BlobInfo{}, nil, nil). - Run(func(_ mock.Arguments) { - listCounter.Add(1) - }) - - // Create new receiver - testConsumer := &consumertest.MetricsSink{} - r, err := newMetricsReceiver(id, testLogger, cfg, testConsumer) - require.NoError(t, err) - - checkFunc := func() bool { - return listCounter.Load() == 3 - } - runRehydrationValidateTest(t, r, checkFunc) - }) - t.Run("metrics", func(t *testing.T) { // Test data metrics, jsonBytes := testutils.GenerateTestMetrics(t) @@ -153,24 +132,29 @@ func Test_fullRehydration(t *testing.T) { }, } + // Create new receiver + targetBlob := returnedBlobInfo[0] // Setup mocks mockClient := setNewAzureBlobClient(t) - mockClient.EXPECT().ListBlobs(mock.Anything, cfg.Container, (*string)(nil), (*string)(nil)).Return(returnedBlobInfo, nil, nil) + testConsumer := &consumertest.MetricsSink{} + r, err := newMetricsReceiver(id, testLogger, cfg, testConsumer) + require.NoError(t, err) + + mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { + r.blobChan <- &azureblob.BlobResults{ + Blobs: returnedBlobInfo, + } + close(r.doneChan) + }) + mockClient.EXPECT().DownloadBlob(mock.Anything, cfg.Container, targetBlob.Name, mock.Anything).RunAndReturn(func(_ context.Context, _ string, _ string, buf []byte) (int64, error) { require.Len(t, buf, int(expectedBuffSize)) - copy(buf, jsonBytes) - return expectedBuffSize, nil }) - // Create new receiver - testConsumer := &consumertest.MetricsSink{} - r, err := newMetricsReceiver(id, testLogger, cfg, testConsumer) - require.NoError(t, err) - checkFunc := func() bool { return testConsumer.DataPointCount() == metrics.DataPointCount() } @@ -195,23 +179,26 @@ func Test_fullRehydration(t *testing.T) { } targetBlob := returnedBlobInfo[0] + mockClient := setNewAzureBlobClient(t) + + // Create new receiver + testConsumer := &consumertest.TracesSink{} + r, err := newTracesReceiver(id, testLogger, cfg, testConsumer) + require.NoError(t, err) // Setup mocks - mockClient := setNewAzureBlobClient(t) - mockClient.EXPECT().ListBlobs(mock.Anything, cfg.Container, (*string)(nil), (*string)(nil)).Return(returnedBlobInfo, nil, nil) + mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { + r.blobChan <- &azureblob.BlobResults{ + Blobs: returnedBlobInfo, + } + close(r.doneChan) + }) mockClient.EXPECT().DownloadBlob(mock.Anything, cfg.Container, targetBlob.Name, mock.Anything).RunAndReturn(func(_ context.Context, _ string, _ string, buf []byte) (int64, error) { require.Len(t, buf, int(expectedBuffSize)) - copy(buf, jsonBytes) - return expectedBuffSize, nil }) - // Create new receiver - testConsumer := &consumertest.TracesSink{} - r, err := newTracesReceiver(id, testLogger, cfg, testConsumer) - require.NoError(t, err) - checkFunc := func() bool { return testConsumer.SpanCount() == traces.SpanCount() } @@ -239,7 +226,17 @@ func Test_fullRehydration(t *testing.T) { // Setup mocks mockClient := setNewAzureBlobClient(t) - mockClient.EXPECT().ListBlobs(mock.Anything, cfg.Container, (*string)(nil), (*string)(nil)).Return(returnedBlobInfo, nil, nil) + // Create new receiver + testConsumer := &consumertest.LogsSink{} + r, err := newLogsReceiver(id, testLogger, cfg, testConsumer) + require.NoError(t, err) + + mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { + r.blobChan <- &azureblob.BlobResults{ + Blobs: returnedBlobInfo, + } + close(r.doneChan) + }) mockClient.EXPECT().DownloadBlob(mock.Anything, cfg.Container, targetBlob.Name, mock.Anything).RunAndReturn(func(_ context.Context, _ string, _ string, buf []byte) (int64, error) { require.Len(t, buf, int(expectedBuffSize)) @@ -248,11 +245,6 @@ func Test_fullRehydration(t *testing.T) { return expectedBuffSize, nil }) - // Create new receiver - testConsumer := &consumertest.LogsSink{} - r, err := newLogsReceiver(id, testLogger, cfg, testConsumer) - require.NoError(t, err) - checkFunc := func() bool { return testConsumer.LogRecordCount() == logs.LogRecordCount() } @@ -281,7 +273,17 @@ func Test_fullRehydration(t *testing.T) { // Setup mocks mockClient := setNewAzureBlobClient(t) - mockClient.EXPECT().ListBlobs(mock.Anything, cfg.Container, (*string)(nil), (*string)(nil)).Return(returnedBlobInfo, nil, nil) + // Create new receiver + testConsumer := &consumertest.LogsSink{} + r, err := newLogsReceiver(id, testLogger, cfg, testConsumer) + require.NoError(t, err) + + mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { + r.blobChan <- &azureblob.BlobResults{ + Blobs: returnedBlobInfo, + } + close(r.doneChan) + }) mockClient.EXPECT().DownloadBlob(mock.Anything, cfg.Container, targetBlob.Name, mock.Anything).RunAndReturn(func(_ context.Context, _ string, _ string, buf []byte) (int64, error) { require.Len(t, buf, int(expectedBuffSize)) @@ -290,11 +292,6 @@ func Test_fullRehydration(t *testing.T) { return expectedBuffSize, nil }) - // Create new receiver - testConsumer := &consumertest.LogsSink{} - r, err := newLogsReceiver(id, testLogger, cfg, testConsumer) - require.NoError(t, err) - checkFunc := func() bool { return testConsumer.LogRecordCount() == logs.LogRecordCount() } @@ -327,7 +324,17 @@ func Test_fullRehydration(t *testing.T) { // Setup mocks mockClient := setNewAzureBlobClient(t) - mockClient.EXPECT().ListBlobs(mock.Anything, cfg.Container, (*string)(nil), (*string)(nil)).Return(returnedBlobInfo, nil, nil) + // Create new receiver + testConsumer := &consumertest.LogsSink{} + r, err := newLogsReceiver(id, testLogger, cfg, testConsumer) + require.NoError(t, err) + + mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { + r.blobChan <- &azureblob.BlobResults{ + Blobs: returnedBlobInfo, + } + close(r.doneChan) + }) mockClient.EXPECT().DownloadBlob(mock.Anything, cfg.Container, targetBlob.Name, mock.Anything).RunAndReturn(func(_ context.Context, _ string, _ string, buf []byte) (int64, error) { require.Len(t, buf, int(expectedBuffSize)) @@ -335,12 +342,8 @@ func Test_fullRehydration(t *testing.T) { return expectedBuffSize, nil }) - mockClient.EXPECT().DeleteBlob(mock.Anything, cfg.Container, targetBlob.Name).Return(nil) - // Create new receiver - testConsumer := &consumertest.LogsSink{} - r, err := newLogsReceiver(id, testLogger, cfg, testConsumer) - require.NoError(t, err) + mockClient.EXPECT().DeleteBlob(mock.Anything, cfg.Container, targetBlob.Name).Return(nil) checkFunc := func() bool { return testConsumer.LogRecordCount() == logs.LogRecordCount() @@ -375,7 +378,18 @@ func Test_fullRehydration(t *testing.T) { // Setup mocks mockClient := setNewAzureBlobClient(t) - mockClient.EXPECT().ListBlobs(mock.Anything, cfg.Container, (*string)(nil), (*string)(nil)).Return(returnedBlobInfo, nil, nil) + + // Create new receiver + testConsumer := &consumertest.LogsSink{} + r, err := newLogsReceiver(id, testLogger, cfg, testConsumer) + require.NoError(t, err) + + mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { + r.blobChan <- &azureblob.BlobResults{ + Blobs: returnedBlobInfo, + } + close(r.doneChan) + }) mockClient.EXPECT().DownloadBlob(mock.Anything, cfg.Container, targetBlob.Name, mock.Anything).RunAndReturn(func(_ context.Context, _ string, _ string, buf []byte) (int64, error) { require.Len(t, buf, int(expectedBuffSize)) @@ -384,11 +398,6 @@ func Test_fullRehydration(t *testing.T) { return expectedBuffSize, nil }) - // Create new receiver - testConsumer := &consumertest.LogsSink{} - r, err := newLogsReceiver(id, testLogger, cfg, testConsumer) - require.NoError(t, err) - checkFunc := func() bool { return testConsumer.LogRecordCount() == logs.LogRecordCount() } @@ -496,7 +505,6 @@ func Test_processBlob(t *testing.T) { }, consumer: mockConsumer, azureClient: mockClient, - // ctx: context.Background(), } err := r.processBlob(context.Background(), tc.info) From f467c37f35c368e8defbb7b0d5a221a81834d199 Mon Sep 17 00:00:00 2001 From: schmikei Date: Thu, 9 Jan 2025 14:44:08 -0500 Subject: [PATCH 03/17] remove polling parameters and fix gosec error --- .../azureblobrehydrationreceiver/README.md | 91 +++---------------- .../azureblobrehydrationreceiver/config.go | 15 --- .../config_test.go | 53 ----------- .../azureblobrehydrationreceiver/factory.go | 3 - .../factory_test.go | 3 - .../azureblobrehydrationreceiver/receiver.go | 3 +- .../receiver_test.go | 1 - 7 files changed, 14 insertions(+), 155 deletions(-) diff --git a/receiver/azureblobrehydrationreceiver/README.md b/receiver/azureblobrehydrationreceiver/README.md index 906b9ecb9..f017313ac 100644 --- a/receiver/azureblobrehydrationreceiver/README.md +++ b/receiver/azureblobrehydrationreceiver/README.md @@ -21,17 +21,18 @@ This is not a traditional receiver that continually produces data but rather reh a. The receiver will process both uncompressed JSON blobs and blobs compressed with gzip. ## Configuration -| Field | Type | Default | Required | Description | -|--------------------|-----------|------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| connection_string | string | | `true` | The connection string to the Azure Blob Storage account. Can be found under the `Access keys` section of your storage account. | -| container | string | | `true` | The name of the container to rehydrate from. | -| root_folder | string | | `false` | The root folder that prefixes the blob path. Should match the `root_folder` value of the Azure Blob Exporter. | -| starting_time | string | | `true ` | The UTC start time that represents the start of the time range to rehydrate from. Must be in the form `YYYY-MM-DDTHH:MM`. | -| ending_time | string | | `true ` | The UTC end time that represents the end of the time range to rehydrate from. Must be in the form `YYYY-MM-DDTHH:MM`. | -| delete_on_read | bool | `false` | `false ` | If `true` the blob will be deleted after being rehydrated. | -| poll_interval | string | `1m` | `false ` | How often to read a new set of blobs. This value is mostly to control how often the blob API is called to ensure once rehydration is done the receiver isn't making too many API calls. | -| poll_timeout | string | `30s` | `false ` | The timeout used when reading blobs from Azure. | -| storage | string | | `false ` | The component ID of a storage extension. The storage extension prevents duplication of data after a collector restart by remembering which blobs were previously rehydrated. | + +| Field | Type | Default | Required | Description | +|-------|------|---------|----------|-------------| +| connection_string | string | | `true` | The connection string to the Azure Blob Storage account. Can be found under the `Access keys` section of your storage account. | +| container | string | | `true` | The name of the container to rehydrate from. | +| root_folder | string | | `false` | The root folder that prefixes the blob path. Should match the `root_folder` value of the Azure Blob Exporter. | +| starting_time | string | | `true` | The UTC start time that represents the start of the time range to rehydrate from. Must be in the form `YYYY-MM-DDTHH:MM`. | +| ending_time | string | | `true` | The UTC end time that represents the end of the time range to rehydrate from. Must be in the form `YYYY-MM-DDTHH:MM`. | +| delete_on_read | bool | `false` | `false` | If `true` the blob will be deleted after being rehydrated. | +| storage | string | | `false` | The component ID of a storage extension. The storage extension prevents duplication of data after a collector restart by remembering which blobs were previously rehydrated. | +| batch_size | int | `100` | `false` | The number of blobs to continue processing in the pipeline before sending more data to the pipeline. | +| page_size | int | `1000` | `false` | The maximum number of blobs to request in a single API call. | ## Example Configuration @@ -40,70 +41,4 @@ This is not a traditional receiver that continually produces data but rather reh This configuration specifies a `connection_string`, `container`, `starting_time`, and `ending_time`. This will rehydrate all blobs in the container `my-container` that have a path that represents they were created between `1:00pm` and `2:30pm` UTC time on `October 1, 2023`. -Such a path could look like the following: -``` -year=2023/month=10/day=01/hour=13/minute=30/metrics_12345.json -year=2023/month=10/day=01/hour=13/minute=30/logs_12345.json -year=2023/month=10/day=01/hour=13/minute=30/traces_12345.json -``` - -```yaml -azureblobrehydration: - connection_string: "DefaultEndpointsProtocol=https;AccountName=storage_account_name;AccountKey=storage_account_key;EndpointSuffix=core.windows.net" - container: "my-container" - starting_time: 2023-10-01T13:00 - ending_time: 2023-10-01T14:30 -``` - -### Using Storage Extension Configuration - -This configuration shows using a storage extension to track rehydration progress over agent restarts. The `storage` field is set to the component ID of the storage extension. - - -```yaml -extensions: - file_storage: - directory: $OIQ_OTEL_COLLECTOR_HOME/storage - -receivers: - azureblobrehydration: - connection_string: "DefaultEndpointsProtocol=https;AccountName=storage_account_name;AccountKey=storage_account_key;EndpointSuffix=core.windows.net" - container: "my-container" - starting_time: 2023-10-01T13:00 - ending_time: 2023-10-01T14:30 - storage: "file_storage" -``` - -### Root Folder Configuration - -This configuration specifies an additional field `root_folder` to match the `root_folder` value of the Azure Blob Exporter. -The `root_folder` value in the exporter will prefix the blob path with the root folder and it needs to be accounted for in the rehydration receiver. - -Such a path could look like the following: -``` -root/year=2023/month=10/day=01/hour=13/minute=30/metrics_12345.json -root/year=2023/month=10/day=01/hour=13/minute=30/logs_12345.json -root/year=2023/month=10/day=01/hour=13/minute=30/traces_12345.json -``` - -```yaml -azureblobrehydration: - connection_string: "DefaultEndpointsProtocol=https;AccountName=storage_account_name;AccountKey=storage_account_key;EndpointSuffix=core.windows.net" - container: "my-container" - starting_time: 2023-10-01T13:00 - ending_time: 2023-10-01T14:30 - root_folder: "root" -``` - -### Delete on read Configuration - -This configuration enables the `delete_on_read` functionality which will delete a blob from Azure after it has been successfully rehydrated into OTLP data and sent onto the next component in the pipeline. - -```yaml -azureblobrehydration: - connection_string: "DefaultEndpointsProtocol=https;AccountName=storage_account_name;AccountKey=storage_account_key;EndpointSuffix=core.windows.net" - container: "my-container" - starting_time: 2023-10-01T13:00 - ending_time: 2023-10-01T14:30 - delete_on_read: true -``` +Such a path could look like the following: \ No newline at end of file diff --git a/receiver/azureblobrehydrationreceiver/config.go b/receiver/azureblobrehydrationreceiver/config.go index 08427a276..8192870b4 100644 --- a/receiver/azureblobrehydrationreceiver/config.go +++ b/receiver/azureblobrehydrationreceiver/config.go @@ -51,13 +51,6 @@ type Config struct { // PageSize is the number of blobs to request from the Azure API at a time. (default 1000) PageSize int `mapstructure:"page_size"` - // PollInterval is the interval at which the Azure API is scanned for blobs. - // Default value of 1m - PollInterval time.Duration `mapstructure:"poll_interval"` - - // PollTimeout is the timeout for the Azure API to scan for blobs. - PollTimeout time.Duration `mapstructure:"poll_timeout"` - // ID of the storage extension to use for storing progress StorageID *component.ID `mapstructure:"storage"` } @@ -95,14 +88,6 @@ func (c *Config) Validate() error { return errors.New("page_size must be greater than 0") } - if c.PollInterval < time.Second { - return errors.New("poll_interval must be at least one second") - } - - if c.PollTimeout < time.Second { - return errors.New("poll_timeout must be at least one second") - } - return nil } diff --git a/receiver/azureblobrehydrationreceiver/config_test.go b/receiver/azureblobrehydrationreceiver/config_test.go index 1cf647c34..33b50e0e7 100644 --- a/receiver/azureblobrehydrationreceiver/config_test.go +++ b/receiver/azureblobrehydrationreceiver/config_test.go @@ -17,7 +17,6 @@ package azureblobrehydrationreceiver //import "github.com/observiq/bindplane-ote import ( "errors" "testing" - "time" "github.com/stretchr/testify/require" ) @@ -37,8 +36,6 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "2023-10-02T17:01", DeleteOnRead: false, - PollInterval: time.Second, - PollTimeout: time.Second * 10, BatchSize: 100, PageSize: 1000, }, @@ -53,8 +50,6 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "2023-10-02T17:01", DeleteOnRead: false, - PollInterval: time.Second, - PollTimeout: time.Second * 10, BatchSize: 100, PageSize: 1000, }, @@ -69,8 +64,6 @@ func TestConfigValidate(t *testing.T) { StartingTime: "", EndingTime: "2023-10-02T17:01", DeleteOnRead: false, - PollInterval: time.Second, - PollTimeout: time.Second * 10, BatchSize: 100, PageSize: 1000, }, @@ -85,8 +78,6 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "", DeleteOnRead: false, - PollInterval: time.Second, - PollTimeout: time.Second * 10, BatchSize: 100, PageSize: 1000, }, @@ -101,8 +92,6 @@ func TestConfigValidate(t *testing.T) { StartingTime: "invalid_time", EndingTime: "2023-10-02T17:01", DeleteOnRead: false, - PollInterval: time.Second, - PollTimeout: time.Second * 10, BatchSize: 100, PageSize: 1000, }, @@ -117,8 +106,6 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "invalid_time", DeleteOnRead: false, - PollInterval: time.Second, - PollTimeout: time.Second * 10, BatchSize: 100, PageSize: 1000, }, @@ -133,45 +120,11 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "2023-10-02T16:00", DeleteOnRead: false, - PollInterval: time.Second, - PollTimeout: time.Second * 10, BatchSize: 100, PageSize: 1000, }, expectErr: errors.New("ending_time must be at least one minute after starting_time"), }, - { - desc: "Bad poll_interval", - cfg: &Config{ - ConnectionString: "connection_string", - Container: "container", - RootFolder: "root", - StartingTime: "2023-10-02T17:00", - EndingTime: "2023-10-02T17:01", - DeleteOnRead: false, - PollInterval: time.Millisecond, - PollTimeout: time.Second * 10, - BatchSize: 100, - PageSize: 1000, - }, - expectErr: errors.New("poll_interval must be at least one second"), - }, - { - desc: "Bad poll_timeout", - cfg: &Config{ - ConnectionString: "connection_string", - Container: "container", - RootFolder: "root", - StartingTime: "2023-10-02T17:00", - EndingTime: "2023-10-02T17:01", - DeleteOnRead: false, - PollInterval: time.Second * 2, - PollTimeout: time.Millisecond, - BatchSize: 100, - PageSize: 1000, - }, - expectErr: errors.New("poll_timeout must be at least one second"), - }, { desc: "Bad batch_size", cfg: &Config{ @@ -181,8 +134,6 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "2023-10-02T17:01", DeleteOnRead: false, - PollInterval: time.Second, - PollTimeout: time.Second * 10, BatchSize: 0, PageSize: 1000, }, @@ -197,8 +148,6 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "2023-10-02T17:01", DeleteOnRead: false, - PollInterval: time.Second, - PollTimeout: time.Second * 10, BatchSize: 100, PageSize: 0, }, @@ -213,8 +162,6 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "2023-10-02T17:01", DeleteOnRead: false, - PollInterval: time.Second, - PollTimeout: time.Second * 10, BatchSize: 100, PageSize: 1000, }, diff --git a/receiver/azureblobrehydrationreceiver/factory.go b/receiver/azureblobrehydrationreceiver/factory.go index f887178c0..3309b59a2 100644 --- a/receiver/azureblobrehydrationreceiver/factory.go +++ b/receiver/azureblobrehydrationreceiver/factory.go @@ -17,7 +17,6 @@ package azureblobrehydrationreceiver //import "github.com/observiq/bindplane-ote import ( "context" "errors" - "time" "github.com/observiq/bindplane-otel-collector/receiver/azureblobrehydrationreceiver/internal/metadata" "go.opentelemetry.io/collector/component" @@ -43,8 +42,6 @@ func NewFactory() receiver.Factory { func createDefaultConfig() component.Config { return &Config{ DeleteOnRead: false, - PollInterval: time.Minute, - PollTimeout: time.Second * 30, BatchSize: 100, PageSize: 1000, } diff --git a/receiver/azureblobrehydrationreceiver/factory_test.go b/receiver/azureblobrehydrationreceiver/factory_test.go index 3b23f7646..771089d8f 100644 --- a/receiver/azureblobrehydrationreceiver/factory_test.go +++ b/receiver/azureblobrehydrationreceiver/factory_test.go @@ -16,7 +16,6 @@ package azureblobrehydrationreceiver //import "github.com/observiq/bindplane-ote import ( "testing" - "time" "github.com/stretchr/testify/require" ) @@ -24,8 +23,6 @@ import ( func Test_createDefaultConfig(t *testing.T) { expectedCfg := &Config{ DeleteOnRead: false, - PollInterval: time.Minute, - PollTimeout: time.Second * 30, BatchSize: 100, PageSize: 1000, } diff --git a/receiver/azureblobrehydrationreceiver/receiver.go b/receiver/azureblobrehydrationreceiver/receiver.go index 60b88f730..5cdee4876 100644 --- a/receiver/azureblobrehydrationreceiver/receiver.go +++ b/receiver/azureblobrehydrationreceiver/receiver.go @@ -289,6 +289,5 @@ func (r *rehydrationReceiver) makeCheckpoint(ctx context.Context) error { r.mut.Lock() defer r.mut.Unlock() r.checkpoint.UpdateCheckpoint(*r.lastBlobTime, r.lastBlob.Name) - r.checkpointStore.SaveCheckpoint(ctx, r.checkpointKey(), r.checkpoint) - return nil + return r.checkpointStore.SaveCheckpoint(ctx, r.checkpointKey(), r.checkpoint) } diff --git a/receiver/azureblobrehydrationreceiver/receiver_test.go b/receiver/azureblobrehydrationreceiver/receiver_test.go index 053fd8c81..88f5dc5c2 100644 --- a/receiver/azureblobrehydrationreceiver/receiver_test.go +++ b/receiver/azureblobrehydrationreceiver/receiver_test.go @@ -111,7 +111,6 @@ func Test_fullRehydration(t *testing.T) { cfg := &Config{ StartingTime: "2023-10-02T17:00", EndingTime: "2023-10-02T18:00", - PollInterval: 10 * time.Millisecond, Container: "container", DeleteOnRead: false, } From 02de7baad2401f5acc374d8f94c7cd5d8090b046 Mon Sep 17 00:00:00 2001 From: schmikei Date: Fri, 10 Jan 2025 10:53:47 -0500 Subject: [PATCH 04/17] some more tests --- .../azureblobrehydrationreceiver/README.md | 2 +- .../internal/azureblob/blob_client.go | 56 ++----- .../internal/azureblob/blob_client_test.go | 137 ++++++++++++++++++ .../azureblobrehydrationreceiver/receiver.go | 14 +- 4 files changed, 162 insertions(+), 47 deletions(-) create mode 100644 receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client_test.go diff --git a/receiver/azureblobrehydrationreceiver/README.md b/receiver/azureblobrehydrationreceiver/README.md index f017313ac..697d90946 100644 --- a/receiver/azureblobrehydrationreceiver/README.md +++ b/receiver/azureblobrehydrationreceiver/README.md @@ -13,7 +13,7 @@ This is not a traditional receiver that continually produces data but rather reh - Traces ## How it works -1. The receiver polls blob storage for all blobs in the specified container. +1. The receiver polls blob storage for pages of blobs in the specified container. There is no current way of specifying a time range to rehydrate so any blobs outside fo the time range still need to be retrieved from the API in order to filter via the `starting_time` and `ending_time` configuration. 2. The receiver will parse each blob's path to determine if it matches a path created by the [Azure Blob Exporter](../../exporter/azureblobexporter/README.md#blob-path). 3. If the blob path is from the exporter, the receiver will parse the timestamp represented by the path. 4. If the timestamp is within the configured range the receiver will download the blob and parse its contents into OTLP data. diff --git a/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go b/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go index b77a77071..f0ea14316 100644 --- a/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go +++ b/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go @@ -19,6 +19,7 @@ import ( "context" "fmt" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" ) @@ -32,9 +33,6 @@ type BlobInfo struct { // //go:generate mockery --name BlobClient --output ./mocks --with-expecter --filename mock_blob_client.go --structname MockBlobClient type BlobClient interface { - // ListBlobs returns a list of blobInfo objects present in the container with the given prefix - ListBlobs(ctx context.Context, container string, prefix, marker *string) ([]*BlobInfo, *string, error) - // DownloadBlob downloads the contents of the blob into the supplied buffer. // It will return the count of bytes used in the buffer. DownloadBlob(ctx context.Context, container, blobPath string, buf []byte) (int64, error) @@ -42,14 +40,22 @@ type BlobClient interface { // DeleteBlob deletes the blob in the specified container DeleteBlob(ctx context.Context, container, blobPath string) error - // StreamBlobs will stream blobs to the blobChan and errors to the errChan, generally if an errChan gets an item + // StreamBlobs will stream BlobInfo to the blobChan and errors to the errChan, generally if an errChan gets an item // then the stream should be stopped StreamBlobs(ctx context.Context, container string, prefix *string, errChan chan error, blobChan chan *BlobResults, doneChan chan struct{}) } +type blobClient interface { + NewListBlobsFlatPager(containerName string, options *azblob.ListBlobsFlatOptions) *runtime.Pager[azblob.ListBlobsFlatResponse] + DownloadBuffer(ctx context.Context, containerName string, blobPath string, buffer []byte, options *azblob.DownloadBufferOptions) (int64, error) + DeleteBlob(ctx context.Context, containerName string, blobPath string, options *azblob.DeleteBlobOptions) (azblob.DeleteBlobResponse, error) +} + +var _ blobClient = &azblob.Client{} + // AzureClient is an implementation of the BlobClient for Azure type AzureClient struct { - azClient *azblob.Client + azClient blobClient batchSize int pageSize int32 } @@ -68,46 +74,6 @@ func NewAzureBlobClient(connectionString string, batchSize, pageSize int) (BlobC }, nil } -// ListBlobs returns a list of blobInfo objects present in the container with the given prefix -func (a *AzureClient) ListBlobs(ctx context.Context, container string, prefix, marker *string) ([]*BlobInfo, *string, error) { - listOptions := &azblob.ListBlobsFlatOptions{ - Marker: marker, - Prefix: prefix, - MaxResults: &a.pageSize, - } - - pager := a.azClient.NewListBlobsFlatPager(container, listOptions) - - var nextMarker *string - blobs := make([]*BlobInfo, 0) - for pager.More() { - resp, err := pager.NextPage(ctx) - if err != nil { - return nil, nil, fmt.Errorf("listBlobs: %w", err) - } - - for _, blob := range resp.Segment.BlobItems { - // Skip deleted blobs - if blob.Deleted != nil && *blob.Deleted { - continue - } - // All blob fields are pointers so check all pointers we need before we try to process it - if blob.Name == nil || blob.Properties == nil || blob.Properties.ContentLength == nil { - continue - } - - info := &BlobInfo{ - Name: *blob.Name, - Size: *blob.Properties.ContentLength, - } - blobs = append(blobs, info) - } - nextMarker = resp.NextMarker - } - - return blobs, nextMarker, nil -} - const emptyPollLimit = 3 // BlobResults contains the blobs for the receiver to process and the last marker diff --git a/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client_test.go b/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client_test.go new file mode 100644 index 000000000..4975b40df --- /dev/null +++ b/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client_test.go @@ -0,0 +1,137 @@ +package azureblob + +import ( + "context" + "errors" + "testing" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestNewAzureBlobClient(t *testing.T) { + tests := []struct { + name string + connectionStr string + batchSize int + pageSize int + expectedError bool + }{ + { + name: "Invalid connection string", + connectionStr: "invalid", + batchSize: 100, + pageSize: 1000, + expectedError: true, + }, + { + name: "Valid connection string", + connectionStr: "DefaultEndpointsProtocol=https;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;", + batchSize: 100, + pageSize: 1000, + expectedError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + client, err := NewAzureBlobClient(tt.connectionStr, tt.batchSize, tt.pageSize) + if tt.expectedError { + require.Error(t, err) + require.Nil(t, client) + } else { + require.NoError(t, err) + require.NotNil(t, client) + } + }) + } +} + +func TestDownloadBlob(t *testing.T) { + // Create a mock Azure client using testify/mock + mockClient := &mockAzureClient{} + + client := &AzureClient{ + azClient: mockClient, + batchSize: 100, + pageSize: 1000, + } + + ctx := context.Background() + container := "testcontainer" + blobPath := "test/blob.txt" + testData := []byte("test data content") + buf := make([]byte, 1024) + + mockClient.On("DownloadBuffer", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + buf := args.Get(3).([]byte) + copy(buf, testData) + }).Return(int64(len(testData)), nil) + + t.Run("successful download", func(t *testing.T) { + bytesDownloaded, err := client.DownloadBlob(ctx, container, blobPath, buf) + require.NoError(t, err) + require.Equal(t, int64(len(testData)), bytesDownloaded) + require.Equal(t, string(testData), string(buf[:len(testData)])) + }) +} + +func TestDeleteBlobSuccess(t *testing.T) { + // Create a mock Azure client using testify/mock + mockClient := &mockAzureClient{} + client := &AzureClient{ + azClient: mockClient, + batchSize: 100, + pageSize: 1000, + } + + ctx := context.Background() + container := "testcontainer" + blobPath := "test/blob.txt" + + mockClient.On("DeleteBlob", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(azblob.DeleteBlobResponse{}, nil) + err := client.DeleteBlob(ctx, container, blobPath) + require.NoError(t, err) + +} + +func TestDeleteBlobFailure(t *testing.T) { + // Create a mock Azure client using testify/mock + mockClient := &mockAzureClient{} + client := &AzureClient{ + azClient: mockClient, + batchSize: 100, + pageSize: 1000, + } + + ctx := context.Background() + container := "testcontainer" + blobPath := "test/blob.txt" + + mockClient.On("DeleteBlob", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(azblob.DeleteBlobResponse{}, errors.New("failed to delete")) + err := client.DeleteBlob(ctx, container, blobPath) + require.Error(t, err) + require.Equal(t, "failed to delete", err.Error()) +} + +// mockAzureClient is a mock implementation of the Azure blob client +type mockAzureClient struct { + mock.Mock +} + +func (m *mockAzureClient) NewListBlobsFlatPager(containerName string, options *azblob.ListBlobsFlatOptions) *runtime.Pager[azblob.ListBlobsFlatResponse] { + args := m.Called(containerName, options) + return args.Get(0).(*runtime.Pager[azblob.ListBlobsFlatResponse]) +} + +func (m *mockAzureClient) DownloadBuffer(ctx context.Context, containerName string, blobPath string, buffer []byte, options *azblob.DownloadBufferOptions) (int64, error) { + args := m.Called(ctx, containerName, blobPath, buffer, options) + return args.Get(0).(int64), args.Error(1) +} + +func (m *mockAzureClient) DeleteBlob(ctx context.Context, containerName string, blobPath string, options *azblob.DeleteBlobOptions) (azblob.DeleteBlobResponse, error) { + args := m.Called(ctx, containerName, blobPath, options) + return args.Get(0).(azblob.DeleteBlobResponse), args.Error(1) +} diff --git a/receiver/azureblobrehydrationreceiver/receiver.go b/receiver/azureblobrehydrationreceiver/receiver.go index 5cdee4876..07c887090 100644 --- a/receiver/azureblobrehydrationreceiver/receiver.go +++ b/receiver/azureblobrehydrationreceiver/receiver.go @@ -132,6 +132,8 @@ func newRehydrationReceiver(id component.ID, logger *zap.Logger, cfg *Config) (* }, nil } +// move delimiter to transform block + // Start starts the rehydration receiver func (r *rehydrationReceiver) Start(ctx context.Context, host component.Host) error { if r.cfg.StorageID != nil { @@ -201,6 +203,12 @@ func (r *rehydrationReceiver) rehydrateBlobs(ctx context.Context, blobs []*azure // Go through each blob and parse it's path to determine if we should consume it or not numProcessedBlobs := 0 for _, blob := range blobs { + select { + case <-ctx.Done(): + return + default: + } + blobTime, telemetryType, err := rehydration.ParseEntityPath(blob.Name) switch { case errors.Is(err, rehydration.ErrInvalidEntityPath): @@ -219,7 +227,11 @@ func (r *rehydrationReceiver) rehydrateBlobs(ctx context.Context, blobs []*azure // Process and consume the blob at the given path if err := r.processBlob(ctx, blob); err != nil { - r.logger.Error("Error consuming blob", zap.String("blob", blob.Name), zap.Error(err)) + // If the error is because the context was canceled, then we don't want to log it + if !errors.Is(err, context.Canceled) { + r.logger.Error("Error consuming blob", zap.String("blob", blob.Name), zap.Error(err)) + } + continue } From 782959641c139813e441c626397ca0332f2179fc Mon Sep 17 00:00:00 2001 From: schmikei Date: Fri, 10 Jan 2025 11:06:08 -0500 Subject: [PATCH 05/17] add license --- .../internal/azureblob/blob_client_test.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client_test.go b/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client_test.go index 4975b40df..8fa446bdc 100644 --- a/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client_test.go +++ b/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client_test.go @@ -1,3 +1,17 @@ +// Copyright observIQ, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package azureblob import ( From ac47ac74d0397347376aebbe88262f3a2e428779 Mon Sep 17 00:00:00 2001 From: schmikei Date: Mon, 13 Jan 2025 09:33:40 -0500 Subject: [PATCH 06/17] remove extra debug line --- receiver/azureblobrehydrationreceiver/receiver.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/receiver/azureblobrehydrationreceiver/receiver.go b/receiver/azureblobrehydrationreceiver/receiver.go index 07c887090..df332b9bc 100644 --- a/receiver/azureblobrehydrationreceiver/receiver.go +++ b/receiver/azureblobrehydrationreceiver/receiver.go @@ -201,7 +201,6 @@ func (r *rehydrationReceiver) streamRehydrateBlobs(ctx context.Context) { func (r *rehydrationReceiver) rehydrateBlobs(ctx context.Context, blobs []*azureblob.BlobInfo) { // Go through each blob and parse it's path to determine if we should consume it or not - numProcessedBlobs := 0 for _, blob := range blobs { select { case <-ctx.Done(): @@ -235,8 +234,7 @@ func (r *rehydrationReceiver) rehydrateBlobs(ctx context.Context, blobs []*azure continue } - numProcessedBlobs++ - r.logger.Debug("Processed blob", zap.String("blob", blob.Name), zap.Int("num_processed_blobs", numProcessedBlobs)) + r.logger.Debug("Processed blob", zap.String("blob", blob.Name)) // Delete blob if configured to do so if r.cfg.DeleteOnRead { From aec8cffb103e1f36053d9a5e6c1692d65fa03754 Mon Sep 17 00:00:00 2001 From: schmikei Date: Mon, 13 Jan 2025 14:11:15 -0500 Subject: [PATCH 07/17] address PR feedback --- .../azureblobrehydrationreceiver/README.md | 87 ++++++++++++++++++- .../azureblobrehydrationreceiver/config.go | 21 ++++- .../azureblobrehydrationreceiver/receiver.go | 15 ++-- 3 files changed, 113 insertions(+), 10 deletions(-) diff --git a/receiver/azureblobrehydrationreceiver/README.md b/receiver/azureblobrehydrationreceiver/README.md index 697d90946..0a4811843 100644 --- a/receiver/azureblobrehydrationreceiver/README.md +++ b/receiver/azureblobrehydrationreceiver/README.md @@ -31,9 +31,13 @@ This is not a traditional receiver that continually produces data but rather reh | ending_time | string | | `true` | The UTC end time that represents the end of the time range to rehydrate from. Must be in the form `YYYY-MM-DDTHH:MM`. | | delete_on_read | bool | `false` | `false` | If `true` the blob will be deleted after being rehydrated. | | storage | string | | `false` | The component ID of a storage extension. The storage extension prevents duplication of data after a collector restart by remembering which blobs were previously rehydrated. | +| poll_interval* | string | | `false` | The interval at which the Azure API is scanned for blobs. | +| poll_timeout* | string | | `false` | The timeout for the Azure API to scan for blobs. | | batch_size | int | `100` | `false` | The number of blobs to continue processing in the pipeline before sending more data to the pipeline. | | page_size | int | `1000` | `false` | The maximum number of blobs to request in a single API call. | +> Deprecated*: `poll_interval` and `poll_timeout` are no longer supported and `batch_size`/`page_size` should be used instead. + ## Example Configuration ### Basic Configuration @@ -41,4 +45,85 @@ This is not a traditional receiver that continually produces data but rather reh This configuration specifies a `connection_string`, `container`, `starting_time`, and `ending_time`. This will rehydrate all blobs in the container `my-container` that have a path that represents they were created between `1:00pm` and `2:30pm` UTC time on `October 1, 2023`. -Such a path could look like the following: \ No newline at end of file +Such a path could look like the following: + +``` +year=2023/month=10/day=01/hour=13/minute=30/metrics_12345.json +year=2023/month=10/day=01/hour=13/minute=30/logs_12345.json +year=2023/month=10/day=01/hour=13/minute=30/traces_12345.json +``` + +```yaml +azureblobrehydration: + connection_string: "DefaultEndpointsProtocol=https;AccountName=storage_account_name;AccountKey=storage_account_key;EndpointSuffix=core.windows.net" + container: "my-container" + starting_time: 2023-10-01T13:00 + ending_time: 2023-10-01T14:30 + batch_size: 100 + page_size: 1000 +``` +### Using Storage Extension Configuration +This configuration shows using a storage extension to track rehydration progress over agent restarts. The `storage` field is set to the component ID of the storage extension. + + +```yaml +extensions: + file_storage: + directory: $OIQ_OTEL_COLLECTOR_HOME/storage +receivers: + azureblobrehydration: + connection_string: "DefaultEndpointsProtocol=https;AccountName=storage_account_name;AccountKey=storage_account_key;EndpointSuffix=core.windows.net" + container: "my-container" + starting_time: 2023-10-01T13:00 + ending_time: 2023-10-01T14:30 + storage: "file_storage" + batch_size: 100 + page_size: 1000 +``` + +### Root Folder Configuration + +This configuration specifies an additional field `root_folder` to match the `root_folder` value of the Azure Blob Exporter. +The `root_folder` value in the exporter will prefix the blob path with the root folder and it needs to be accounted for in the rehydration receiver. + +Such a path could look like the following: +``` +root/year=2023/month=10/day=01/hour=13/minute=30/metrics_12345.json +root/year=2023/month=10/day=01/hour=13/minute=30/logs_12345.json +root/year=2023/month=10/day=01/hour=13/minute=30/traces_12345.json +``` + +```yaml +azureblobrehydration: + connection_string: "DefaultEndpointsProtocol=https;AccountName=storage_account_name;AccountKey=storage_account_key;EndpointSuffix=core.windows.net" + container: "my-container" + starting_time: 2023-10-01T13:00 + ending_time: 2023-10-01T14:30 + root_folder: "root" + batch_size: 100 + page_size: 1000 +``` + +### Delete on read Configuration + +This configuration enables the `delete_on_read` functionality which will delete a blob from Azure after it has been successfully rehydrated into OTLP data and sent onto the next component in the pipeline. + +```yaml +azureblobrehydration: + connection_string: "DefaultEndpointsProtocol=https;AccountName=storage_account_name;AccountKey=storage_account_key;EndpointSuffix=core.windows.net" + container: "my-container" + starting_time: 2023-10-01T13:00 + ending_time: 2023-10-01T14:30 + delete_on_read: true + batch_size: 100 + page_size: 1000 +``` + +## Deprecated Configuration + +The following configuration fields are deprecated and will be removed in a future release. + +| Field | Deprecated | +|-------|----------| +| poll_interval | true | +| poll_timeout | true | diff --git a/receiver/azureblobrehydrationreceiver/config.go b/receiver/azureblobrehydrationreceiver/config.go index 8192870b4..ddceb66dc 100644 --- a/receiver/azureblobrehydrationreceiver/config.go +++ b/receiver/azureblobrehydrationreceiver/config.go @@ -25,7 +25,7 @@ import ( // Config is the configuration for the azure blob rehydration receiver type Config struct { - // BatchSize is the number of blobs to process entering the pipeline in a single batch. (default 1000) + // BatchSize is the number of blobs to process entering the pipeline in a single batch. (default 100) BatchSize int `mapstructure:"batch_size"` // ConnectionString is the Azure Blob Storage connection key, @@ -48,6 +48,17 @@ type Config struct { // Default value of false DeleteOnRead bool `mapstructure:"delete_on_read"` + // Deprecated: PollInterval is no longer required due to streaming blobs for processing. + // if a value is provided, validation will throw an error + // PollInterval is the interval at which the Azure API is scanned for blobs. + // Default value of 1m + PollInterval time.Duration `mapstructure:"poll_interval"` + + // Deprecated: PollTimeout is no longer required due to streaming blobs for processing. + // if a value is provided, validation will throw an error + // PollTimeout is the timeout for the Azure API to scan for blobs. + PollTimeout time.Duration `mapstructure:"poll_timeout"` + // PageSize is the number of blobs to request from the Azure API at a time. (default 1000) PageSize int `mapstructure:"page_size"` @@ -57,6 +68,14 @@ type Config struct { // Validate validates the config func (c *Config) Validate() error { + if c.PollInterval != 0 { + return errors.New("poll_interval is no longer supported and batch_size/page_size should be used instead") + } + + if c.PollTimeout != 0 { + return errors.New("poll_timeout is no longer supported and batch_size/page_size should be used instead") + } + if c.BatchSize < 1 { return errors.New("batch_size must be greater than 0") } diff --git a/receiver/azureblobrehydrationreceiver/receiver.go b/receiver/azureblobrehydrationreceiver/receiver.go index df332b9bc..4ba4bc83a 100644 --- a/receiver/azureblobrehydrationreceiver/receiver.go +++ b/receiver/azureblobrehydrationreceiver/receiver.go @@ -132,8 +132,6 @@ func newRehydrationReceiver(id component.ID, logger *zap.Logger, cfg *Config) (* }, nil } -// move delimiter to transform block - // Start starts the rehydration receiver func (r *rehydrationReceiver) Start(ctx context.Context, host component.Host) error { if r.cfg.StorageID != nil { @@ -143,7 +141,11 @@ func (r *rehydrationReceiver) Start(ctx context.Context, host component.Host) er } r.checkpointStore = checkpointStore } - go r.streamRehydrateBlobs(ctx) + + cancelCtx, cancel := context.WithCancel(ctx) + r.cancelFunc = cancel + + go r.streamRehydrateBlobs(cancelCtx) return nil } @@ -175,13 +177,10 @@ func (r *rehydrationReceiver) streamRehydrateBlobs(ctx context.Context) { prefix = &r.cfg.RootFolder } - cancelCtx, cancel := context.WithCancel(ctx) - r.cancelFunc = cancel - startTime := time.Now() r.logger.Info("Starting rehydration", zap.Time("startTime", startTime)) - go r.azureClient.StreamBlobs(cancelCtx, r.cfg.Container, prefix, r.errChan, r.blobChan, r.doneChan) + go r.azureClient.StreamBlobs(ctx, r.cfg.Container, prefix, r.errChan, r.blobChan, r.doneChan) for { select { @@ -201,6 +200,7 @@ func (r *rehydrationReceiver) streamRehydrateBlobs(ctx context.Context) { func (r *rehydrationReceiver) rehydrateBlobs(ctx context.Context, blobs []*azureblob.BlobInfo) { // Go through each blob and parse it's path to determine if we should consume it or not + r.logger.Debug("parsing through blobs", zap.Int("num_blobs", len(blobs))) for _, blob := range blobs { select { case <-ctx.Done(): @@ -230,7 +230,6 @@ func (r *rehydrationReceiver) rehydrateBlobs(ctx context.Context, blobs []*azure if !errors.Is(err, context.Canceled) { r.logger.Error("Error consuming blob", zap.String("blob", blob.Name), zap.Error(err)) } - continue } From 5a7ceb96937efff847d808aa69af95f18dc80a06 Mon Sep 17 00:00:00 2001 From: schmikei Date: Tue, 14 Jan 2025 14:35:38 -0500 Subject: [PATCH 08/17] only log messages rather than submit validation errors --- .../azureblobrehydrationreceiver/config.go | 8 ---- .../config_test.go | 15 +++++++ .../azureblobrehydrationreceiver/receiver.go | 12 +++++ .../receiver_test.go | 44 +++++++++++++++++++ 4 files changed, 71 insertions(+), 8 deletions(-) diff --git a/receiver/azureblobrehydrationreceiver/config.go b/receiver/azureblobrehydrationreceiver/config.go index ddceb66dc..9201f87a2 100644 --- a/receiver/azureblobrehydrationreceiver/config.go +++ b/receiver/azureblobrehydrationreceiver/config.go @@ -68,14 +68,6 @@ type Config struct { // Validate validates the config func (c *Config) Validate() error { - if c.PollInterval != 0 { - return errors.New("poll_interval is no longer supported and batch_size/page_size should be used instead") - } - - if c.PollTimeout != 0 { - return errors.New("poll_timeout is no longer supported and batch_size/page_size should be used instead") - } - if c.BatchSize < 1 { return errors.New("batch_size must be greater than 0") } diff --git a/receiver/azureblobrehydrationreceiver/config_test.go b/receiver/azureblobrehydrationreceiver/config_test.go index 33b50e0e7..4c7a35288 100644 --- a/receiver/azureblobrehydrationreceiver/config_test.go +++ b/receiver/azureblobrehydrationreceiver/config_test.go @@ -125,6 +125,21 @@ func TestConfigValidate(t *testing.T) { }, expectErr: errors.New("ending_time must be at least one minute after starting_time"), }, + { + desc: "with deprecated poll_interval", + cfg: &Config{ + ConnectionString: "connection_string", + Container: "container", + RootFolder: "root", + StartingTime: "2023-10-02T17:00", + EndingTime: "2023-10-02T17:01", + DeleteOnRead: false, + BatchSize: 100, + PageSize: 1000, + }, + // expect no error until future release where poll_interval is removed + expectErr: nil, + }, { desc: "Bad batch_size", cfg: &Config{ diff --git a/receiver/azureblobrehydrationreceiver/receiver.go b/receiver/azureblobrehydrationreceiver/receiver.go index 4ba4bc83a..66490725a 100644 --- a/receiver/azureblobrehydrationreceiver/receiver.go +++ b/receiver/azureblobrehydrationreceiver/receiver.go @@ -134,6 +134,8 @@ func newRehydrationReceiver(id component.ID, logger *zap.Logger, cfg *Config) (* // Start starts the rehydration receiver func (r *rehydrationReceiver) Start(ctx context.Context, host component.Host) error { + r.logAnyDeprecationWarnings() + if r.cfg.StorageID != nil { checkpointStore, err := rehydration.NewCheckpointStorage(ctx, host, *r.cfg.StorageID, r.id, r.supportedTelemetry) if err != nil { @@ -149,6 +151,16 @@ func (r *rehydrationReceiver) Start(ctx context.Context, host component.Host) er return nil } +func (r *rehydrationReceiver) logAnyDeprecationWarnings() { + if r.cfg.PollInterval != 0 { + r.logger.Warn("poll_interval is no longer recognized and will be removed in a future release. batch_size/page_size should be used instead") + } + + if r.cfg.PollTimeout != 0 { + r.logger.Warn("poll_timeout is no longer recognized and will be removed in a future release. batch_size/page_size should be used instead") + } +} + // Shutdown shuts down the rehydration receiver func (r *rehydrationReceiver) Shutdown(ctx context.Context) error { var err error diff --git a/receiver/azureblobrehydrationreceiver/receiver_test.go b/receiver/azureblobrehydrationreceiver/receiver_test.go index 88f5dc5c2..b2ebf2bf3 100644 --- a/receiver/azureblobrehydrationreceiver/receiver_test.go +++ b/receiver/azureblobrehydrationreceiver/receiver_test.go @@ -19,6 +19,8 @@ import ( "compress/gzip" "context" "errors" + "strings" + "sync" "testing" "time" @@ -29,6 +31,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pipeline" "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" "github.com/observiq/bindplane-otel-collector/internal/rehydration" "github.com/observiq/bindplane-otel-collector/internal/testutils" @@ -516,6 +519,47 @@ func Test_processBlob(t *testing.T) { } } +func TestLogsDeprecationWarnings(t *testing.T) { + mockClient := setNewAzureBlobClient(t) + + testLogger, ol := observer.New(zap.WarnLevel) + + r := &rehydrationReceiver{ + logger: zap.New(testLogger), + cfg: &Config{ + StartingTime: "2023-10-02T17:00", + EndingTime: "2023-10-02T17:01", + PollInterval: 1 * time.Second, + PollTimeout: 1 * time.Second, + }, + azureClient: mockClient, + blobChan: make(chan *azureblob.BlobResults), + errChan: make(chan error), + doneChan: make(chan struct{}), + mut: &sync.Mutex{}, + checkpointStore: rehydration.NewNopStorage(), + } + mockClient.On("StreamBlobs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { + close(r.doneChan) + }) + + require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) + + require.Eventually(t, func() bool { + foundBothLogs := false + foundPollInterval := false + for _, log := range ol.All() { + if strings.Contains(log.Message, "poll_interval is no longer recognized and will be removed in a future release. batch_size/page_size should be used instead") { + foundBothLogs = true + } + if strings.Contains(log.Message, "poll_interval is no longer recognized and will be removed in a future release. batch_size/page_size should be used instead") { + foundPollInterval = true + } + } + return foundBothLogs && foundPollInterval + }, 10*time.Second, 1*time.Second) +} + // setNewAzureBlobClient helper function used to set the newAzureBlobClient // function with a mock and return the mock. func setNewAzureBlobClient(t *testing.T) *blobmocks.MockBlobClient { From 6c0dbc1485143c49605695e3baeeb81e7c261af4 Mon Sep 17 00:00:00 2001 From: schmikei Date: Wed, 15 Jan 2025 10:04:24 -0500 Subject: [PATCH 09/17] spin up goroutine per blob in the batch, change default batch size to 30 to keep a moderate default --- .../azureblobrehydrationreceiver/README.md | 4 +- .../azureblobrehydrationreceiver/config.go | 2 +- .../config_test.go | 20 +++--- .../azureblobrehydrationreceiver/factory.go | 7 +- .../factory_test.go | 4 +- .../internal/azureblob/blob_client.go | 18 +---- .../azureblobrehydrationreceiver/receiver.go | 68 ++++++++++++++----- .../receiver_test.go | 12 ++-- 8 files changed, 80 insertions(+), 55 deletions(-) diff --git a/receiver/azureblobrehydrationreceiver/README.md b/receiver/azureblobrehydrationreceiver/README.md index 0a4811843..eeda7c234 100644 --- a/receiver/azureblobrehydrationreceiver/README.md +++ b/receiver/azureblobrehydrationreceiver/README.md @@ -33,8 +33,8 @@ This is not a traditional receiver that continually produces data but rather reh | storage | string | | `false` | The component ID of a storage extension. The storage extension prevents duplication of data after a collector restart by remembering which blobs were previously rehydrated. | | poll_interval* | string | | `false` | The interval at which the Azure API is scanned for blobs. | | poll_timeout* | string | | `false` | The timeout for the Azure API to scan for blobs. | -| batch_size | int | `100` | `false` | The number of blobs to continue processing in the pipeline before sending more data to the pipeline. | -| page_size | int | `1000` | `false` | The maximum number of blobs to request in a single API call. | +| batch_size | int | `30` | `false` | The number of blobs to download and process in the pipeline simultaneously. This parameter directly impacts performance by controlling the concurrent blob download limit. | +| page_size | int | `1000` | `false` | The maximum number of blob information to request in a single API call. | > Deprecated*: `poll_interval` and `poll_timeout` are no longer supported and `batch_size`/`page_size` should be used instead. diff --git a/receiver/azureblobrehydrationreceiver/config.go b/receiver/azureblobrehydrationreceiver/config.go index 9201f87a2..06d87a86d 100644 --- a/receiver/azureblobrehydrationreceiver/config.go +++ b/receiver/azureblobrehydrationreceiver/config.go @@ -25,7 +25,7 @@ import ( // Config is the configuration for the azure blob rehydration receiver type Config struct { - // BatchSize is the number of blobs to process entering the pipeline in a single batch. (default 100) + // BatchSize is the number of blobs to process entering the pipeline in a single batch. (default 30) BatchSize int `mapstructure:"batch_size"` // ConnectionString is the Azure Blob Storage connection key, diff --git a/receiver/azureblobrehydrationreceiver/config_test.go b/receiver/azureblobrehydrationreceiver/config_test.go index 4c7a35288..9a4adb5c0 100644 --- a/receiver/azureblobrehydrationreceiver/config_test.go +++ b/receiver/azureblobrehydrationreceiver/config_test.go @@ -36,7 +36,7 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "2023-10-02T17:01", DeleteOnRead: false, - BatchSize: 100, + BatchSize: 30, PageSize: 1000, }, expectErr: errors.New("connection_string is required"), @@ -50,7 +50,7 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "2023-10-02T17:01", DeleteOnRead: false, - BatchSize: 100, + BatchSize: 30, PageSize: 1000, }, expectErr: errors.New("container is required"), @@ -64,7 +64,7 @@ func TestConfigValidate(t *testing.T) { StartingTime: "", EndingTime: "2023-10-02T17:01", DeleteOnRead: false, - BatchSize: 100, + BatchSize: 30, PageSize: 1000, }, expectErr: errors.New("starting_time is invalid: missing value"), @@ -78,7 +78,7 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "", DeleteOnRead: false, - BatchSize: 100, + BatchSize: 30, PageSize: 1000, }, expectErr: errors.New("ending_time is invalid: missing value"), @@ -92,7 +92,7 @@ func TestConfigValidate(t *testing.T) { StartingTime: "invalid_time", EndingTime: "2023-10-02T17:01", DeleteOnRead: false, - BatchSize: 100, + BatchSize: 30, PageSize: 1000, }, expectErr: errors.New("starting_time is invalid: invalid timestamp"), @@ -106,7 +106,7 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "invalid_time", DeleteOnRead: false, - BatchSize: 100, + BatchSize: 30, PageSize: 1000, }, expectErr: errors.New("ending_time is invalid: invalid timestamp"), @@ -120,7 +120,7 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "2023-10-02T16:00", DeleteOnRead: false, - BatchSize: 100, + BatchSize: 30, PageSize: 1000, }, expectErr: errors.New("ending_time must be at least one minute after starting_time"), @@ -134,7 +134,7 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "2023-10-02T17:01", DeleteOnRead: false, - BatchSize: 100, + BatchSize: 30, PageSize: 1000, }, // expect no error until future release where poll_interval is removed @@ -163,7 +163,7 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "2023-10-02T17:01", DeleteOnRead: false, - BatchSize: 100, + BatchSize: 30, PageSize: 0, }, expectErr: errors.New("page_size must be greater than 0"), @@ -177,7 +177,7 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "2023-10-02T17:01", DeleteOnRead: false, - BatchSize: 100, + BatchSize: 30, PageSize: 1000, }, expectErr: nil, diff --git a/receiver/azureblobrehydrationreceiver/factory.go b/receiver/azureblobrehydrationreceiver/factory.go index 3309b59a2..cc87dd826 100644 --- a/receiver/azureblobrehydrationreceiver/factory.go +++ b/receiver/azureblobrehydrationreceiver/factory.go @@ -27,6 +27,9 @@ import ( // errImproperCfgType error for when an invalid config type is passed to receiver creation funcs var errImproperCfgType = errors.New("improper config type") +const defaultBatchSize = 30 +const defaultPageSize = 1000 + // NewFactory creates a new receiver factory func NewFactory() receiver.Factory { return receiver.NewFactory( @@ -42,8 +45,8 @@ func NewFactory() receiver.Factory { func createDefaultConfig() component.Config { return &Config{ DeleteOnRead: false, - BatchSize: 100, - PageSize: 1000, + BatchSize: defaultBatchSize, + PageSize: defaultPageSize, } } diff --git a/receiver/azureblobrehydrationreceiver/factory_test.go b/receiver/azureblobrehydrationreceiver/factory_test.go index 771089d8f..129bf32bb 100644 --- a/receiver/azureblobrehydrationreceiver/factory_test.go +++ b/receiver/azureblobrehydrationreceiver/factory_test.go @@ -23,8 +23,8 @@ import ( func Test_createDefaultConfig(t *testing.T) { expectedCfg := &Config{ DeleteOnRead: false, - BatchSize: 100, - PageSize: 1000, + BatchSize: defaultBatchSize, + PageSize: defaultPageSize, } componentCfg := createDefaultConfig() diff --git a/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go b/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go index f0ea14316..231f18751 100644 --- a/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go +++ b/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go @@ -66,7 +66,6 @@ func NewAzureBlobClient(connectionString string, batchSize, pageSize int) (BlobC if err != nil { return nil, err } - return &AzureClient{ azClient: azClient, batchSize: batchSize, @@ -74,8 +73,6 @@ func NewAzureBlobClient(connectionString string, batchSize, pageSize int) (BlobC }, nil } -const emptyPollLimit = 3 - // BlobResults contains the blobs for the receiver to process and the last marker type BlobResults struct { Blobs []*BlobInfo @@ -86,25 +83,18 @@ type BlobResults struct { // then the stream should be stopped func (a *AzureClient) StreamBlobs(ctx context.Context, container string, prefix *string, errChan chan error, blobChan chan *BlobResults, doneChan chan struct{}) { var marker *string + pager := a.azClient.NewListBlobsFlatPager(container, &azblob.ListBlobsFlatOptions{ Marker: marker, Prefix: prefix, MaxResults: &a.pageSize, }) - emptyPollCount := 0 for pager.More() { select { case <-ctx.Done(): return default: - // If we had empty polls for the last 3 times, then we can assume that there are no more blobs to process - // and we can close the stream to avoid charging for the requests - if emptyPollCount == emptyPollLimit { - close(doneChan) - return - } - resp, err := pager.NextPage(ctx) if err != nil { errChan <- fmt.Errorf("error streaming blobs: %w", err) @@ -134,12 +124,6 @@ func (a *AzureClient) StreamBlobs(ctx context.Context, container string, prefix } } - if len(batch) == 0 { - emptyPollCount++ - continue - } - - emptyPollCount = 0 blobChan <- &BlobResults{ Blobs: batch, LastMarker: marker, diff --git a/receiver/azureblobrehydrationreceiver/receiver.go b/receiver/azureblobrehydrationreceiver/receiver.go index 66490725a..980d168a2 100644 --- a/receiver/azureblobrehydrationreceiver/receiver.go +++ b/receiver/azureblobrehydrationreceiver/receiver.go @@ -20,6 +20,7 @@ import ( "fmt" "path/filepath" "sync" + "sync/atomic" "time" "github.com/observiq/bindplane-otel-collector/internal/rehydration" @@ -48,7 +49,9 @@ type rehydrationReceiver struct { errChan chan error doneChan chan struct{} + // mutexes for ensuring a thread safe checkpoint mut *sync.Mutex + wg *sync.WaitGroup lastBlob *azureblob.BlobInfo lastBlobTime *time.Time @@ -129,6 +132,7 @@ func newRehydrationReceiver(id component.ID, logger *zap.Logger, cfg *Config) (* errChan: make(chan error), doneChan: make(chan struct{}), mut: &sync.Mutex{}, + wg: &sync.WaitGroup{}, }, nil } @@ -164,7 +168,6 @@ func (r *rehydrationReceiver) logAnyDeprecationWarnings() { // Shutdown shuts down the rehydration receiver func (r *rehydrationReceiver) Shutdown(ctx context.Context) error { var err error - // If we have called started then close and wait for goroutine to finish if r.cancelFunc != nil { r.cancelFunc() } @@ -194,6 +197,8 @@ func (r *rehydrationReceiver) streamRehydrateBlobs(ctx context.Context) { go r.azureClient.StreamBlobs(ctx, r.cfg.Container, prefix, r.errChan, r.blobChan, r.doneChan) + emptyPolls := 0 + for { select { case <-ctx.Done(): @@ -205,18 +210,31 @@ func (r *rehydrationReceiver) streamRehydrateBlobs(ctx context.Context) { r.logger.Error("Error streaming blobs, stopping rehydration", zap.Error(err), zap.Int("durationSeconds", int(time.Since(startTime).Seconds()))) return case br := <-r.blobChan: - r.rehydrateBlobs(ctx, br.Blobs) + numProcessedBlobs := r.rehydrateBlobs(ctx, br.Blobs) + if numProcessedBlobs != 0 { + emptyPolls = 0 + continue + } + + emptyPolls++ + if emptyPolls == emptyPollLimit { + r.logger.Warn("No blobs processed for 3 consecutive polls, assuming no more blobs to process") + return + } } } } -func (r *rehydrationReceiver) rehydrateBlobs(ctx context.Context, blobs []*azureblob.BlobInfo) { +const emptyPollLimit = 5 + +func (r *rehydrationReceiver) rehydrateBlobs(ctx context.Context, blobs []*azureblob.BlobInfo) (numProcessedBlobs int) { // Go through each blob and parse it's path to determine if we should consume it or not r.logger.Debug("parsing through blobs", zap.Int("num_blobs", len(blobs))) + processedBlobCount := atomic.Int64{} for _, blob := range blobs { select { case <-ctx.Done(): - return + return int(processedBlobCount.Load()) default: } @@ -236,29 +254,40 @@ func (r *rehydrationReceiver) rehydrateBlobs(ctx context.Context, blobs []*azure r.lastBlob = blob r.lastBlobTime = blobTime - // Process and consume the blob at the given path - if err := r.processBlob(ctx, blob); err != nil { - // If the error is because the context was canceled, then we don't want to log it - if !errors.Is(err, context.Canceled) { - r.logger.Error("Error consuming blob", zap.String("blob", blob.Name), zap.Error(err)) + r.wg.Add(1) + go func() { + defer r.wg.Done() + select { + case <-ctx.Done(): + return + default: } - continue - } - r.logger.Debug("Processed blob", zap.String("blob", blob.Name)) + // Process and consume the blob at the given path + if err := r.processBlob(ctx, blob); err != nil { + // If the error is because the context was canceled, then we don't want to log it + if !errors.Is(err, context.Canceled) { + r.logger.Error("Error consuming blob", zap.String("blob", blob.Name), zap.Error(err)) + } + return + } + processedBlobCount.Add(1) - // Delete blob if configured to do so - if r.cfg.DeleteOnRead { - if err := r.azureClient.DeleteBlob(ctx, r.cfg.Container, blob.Name); err != nil { + // Delete blob if configured to do so + if err := r.conditionallyDeleteBlob(ctx, blob); err != nil { r.logger.Error("Error while attempting to delete blob", zap.String("blob", blob.Name), zap.Error(err)) } - } + }() } } if err := r.makeCheckpoint(ctx); err != nil { r.logger.Error("Error while saving checkpoint", zap.Error(err)) } + + r.wg.Wait() + + return int(processedBlobCount.Load()) } // processBlob does the following: @@ -312,3 +341,10 @@ func (r *rehydrationReceiver) makeCheckpoint(ctx context.Context) error { r.checkpoint.UpdateCheckpoint(*r.lastBlobTime, r.lastBlob.Name) return r.checkpointStore.SaveCheckpoint(ctx, r.checkpointKey(), r.checkpoint) } + +func (r *rehydrationReceiver) conditionallyDeleteBlob(ctx context.Context, blob *azureblob.BlobInfo) error { + if !r.cfg.DeleteOnRead { + return nil + } + return r.azureClient.DeleteBlob(ctx, r.cfg.Container, blob.Name) +} diff --git a/receiver/azureblobrehydrationreceiver/receiver_test.go b/receiver/azureblobrehydrationreceiver/receiver_test.go index b2ebf2bf3..805f5680a 100644 --- a/receiver/azureblobrehydrationreceiver/receiver_test.go +++ b/receiver/azureblobrehydrationreceiver/receiver_test.go @@ -302,10 +302,12 @@ func Test_fullRehydration(t *testing.T) { }) t.Run("Delete on Read", func(t *testing.T) { - cfg.DeleteOnRead = true - t.Cleanup(func() { - cfg.DeleteOnRead = false - }) + deleteCfg := &Config{ + StartingTime: cfg.StartingTime, + EndingTime: cfg.EndingTime, + Container: cfg.Container, + DeleteOnRead: true, + } // Test data logs, jsonBytes := testutils.GenerateTestLogs(t) @@ -328,7 +330,7 @@ func Test_fullRehydration(t *testing.T) { mockClient := setNewAzureBlobClient(t) // Create new receiver testConsumer := &consumertest.LogsSink{} - r, err := newLogsReceiver(id, testLogger, cfg, testConsumer) + r, err := newLogsReceiver(id, testLogger, deleteCfg, testConsumer) require.NoError(t, err) mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { From 2c3d4eee83cbef3086f1d112d4bc0e7b397eecd1 Mon Sep 17 00:00:00 2001 From: schmikei Date: Wed, 15 Jan 2025 17:45:11 -0500 Subject: [PATCH 10/17] add buffered chan size of 5 to start --- .../azureblobrehydrationreceiver/config.go | 1 + .../internal/azureblob/blob_client.go | 20 +---- .../azureblob/mocks/mock_blob_client.go | 84 ++----------------- .../azureblobrehydrationreceiver/receiver.go | 20 +++-- .../receiver_test.go | 26 ++---- 5 files changed, 30 insertions(+), 121 deletions(-) diff --git a/receiver/azureblobrehydrationreceiver/config.go b/receiver/azureblobrehydrationreceiver/config.go index 06d87a86d..8c934ecb6 100644 --- a/receiver/azureblobrehydrationreceiver/config.go +++ b/receiver/azureblobrehydrationreceiver/config.go @@ -26,6 +26,7 @@ import ( // Config is the configuration for the azure blob rehydration receiver type Config struct { // BatchSize is the number of blobs to process entering the pipeline in a single batch. (default 30) + // This number directly affects the number of goroutines that will be created to process the blobs. BatchSize int `mapstructure:"batch_size"` // ConnectionString is the Azure Blob Storage connection key, diff --git a/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go b/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go index 231f18751..f3fb2201e 100644 --- a/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go +++ b/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go @@ -42,7 +42,7 @@ type BlobClient interface { // StreamBlobs will stream BlobInfo to the blobChan and errors to the errChan, generally if an errChan gets an item // then the stream should be stopped - StreamBlobs(ctx context.Context, container string, prefix *string, errChan chan error, blobChan chan *BlobResults, doneChan chan struct{}) + StreamBlobs(ctx context.Context, container string, prefix *string, errChan chan error, blobChan chan []*BlobInfo, doneChan chan struct{}) } type blobClient interface { @@ -73,15 +73,9 @@ func NewAzureBlobClient(connectionString string, batchSize, pageSize int) (BlobC }, nil } -// BlobResults contains the blobs for the receiver to process and the last marker -type BlobResults struct { - Blobs []*BlobInfo - LastMarker *string -} - // StreamBlobs will stream blobs to the blobChan and errors to the errChan, generally if an errChan gets an item // then the stream should be stopped -func (a *AzureClient) StreamBlobs(ctx context.Context, container string, prefix *string, errChan chan error, blobChan chan *BlobResults, doneChan chan struct{}) { +func (a *AzureClient) StreamBlobs(ctx context.Context, container string, prefix *string, errChan chan error, blobChan chan []*BlobInfo, doneChan chan struct{}) { var marker *string pager := a.azClient.NewListBlobsFlatPager(container, &azblob.ListBlobsFlatOptions{ @@ -116,18 +110,12 @@ func (a *AzureClient) StreamBlobs(ctx context.Context, container string, prefix } batch = append(batch, info) if len(batch) == int(a.batchSize) { - blobChan <- &BlobResults{ - Blobs: batch, - LastMarker: marker, - } + blobChan <- batch batch = []*BlobInfo{} } } - blobChan <- &BlobResults{ - Blobs: batch, - LastMarker: marker, - } + blobChan <- batch marker = resp.NextMarker } } diff --git a/receiver/azureblobrehydrationreceiver/internal/azureblob/mocks/mock_blob_client.go b/receiver/azureblobrehydrationreceiver/internal/azureblob/mocks/mock_blob_client.go index 621e9211b..32b5cdc7d 100644 --- a/receiver/azureblobrehydrationreceiver/internal/azureblob/mocks/mock_blob_client.go +++ b/receiver/azureblobrehydrationreceiver/internal/azureblob/mocks/mock_blob_client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.49.0. DO NOT EDIT. +// Code generated by mockery v2.51.0. DO NOT EDIT. package mocks @@ -130,78 +130,8 @@ func (_c *MockBlobClient_DownloadBlob_Call) RunAndReturn(run func(context.Contex return _c } -// ListBlobs provides a mock function with given fields: ctx, container, prefix, marker -func (_m *MockBlobClient) ListBlobs(ctx context.Context, container string, prefix *string, marker *string) ([]*azureblob.BlobInfo, *string, error) { - ret := _m.Called(ctx, container, prefix, marker) - - if len(ret) == 0 { - panic("no return value specified for ListBlobs") - } - - var r0 []*azureblob.BlobInfo - var r1 *string - var r2 error - if rf, ok := ret.Get(0).(func(context.Context, string, *string, *string) ([]*azureblob.BlobInfo, *string, error)); ok { - return rf(ctx, container, prefix, marker) - } - if rf, ok := ret.Get(0).(func(context.Context, string, *string, *string) []*azureblob.BlobInfo); ok { - r0 = rf(ctx, container, prefix, marker) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]*azureblob.BlobInfo) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, string, *string, *string) *string); ok { - r1 = rf(ctx, container, prefix, marker) - } else { - if ret.Get(1) != nil { - r1 = ret.Get(1).(*string) - } - } - - if rf, ok := ret.Get(2).(func(context.Context, string, *string, *string) error); ok { - r2 = rf(ctx, container, prefix, marker) - } else { - r2 = ret.Error(2) - } - - return r0, r1, r2 -} - -// MockBlobClient_ListBlobs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListBlobs' -type MockBlobClient_ListBlobs_Call struct { - *mock.Call -} - -// ListBlobs is a helper method to define mock.On call -// - ctx context.Context -// - container string -// - prefix *string -// - marker *string -func (_e *MockBlobClient_Expecter) ListBlobs(ctx interface{}, container interface{}, prefix interface{}, marker interface{}) *MockBlobClient_ListBlobs_Call { - return &MockBlobClient_ListBlobs_Call{Call: _e.mock.On("ListBlobs", ctx, container, prefix, marker)} -} - -func (_c *MockBlobClient_ListBlobs_Call) Run(run func(ctx context.Context, container string, prefix *string, marker *string)) *MockBlobClient_ListBlobs_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(*string), args[3].(*string)) - }) - return _c -} - -func (_c *MockBlobClient_ListBlobs_Call) Return(_a0 []*azureblob.BlobInfo, _a1 *string, _a2 error) *MockBlobClient_ListBlobs_Call { - _c.Call.Return(_a0, _a1, _a2) - return _c -} - -func (_c *MockBlobClient_ListBlobs_Call) RunAndReturn(run func(context.Context, string, *string, *string) ([]*azureblob.BlobInfo, *string, error)) *MockBlobClient_ListBlobs_Call { - _c.Call.Return(run) - return _c -} - // StreamBlobs provides a mock function with given fields: ctx, container, prefix, errChan, blobChan, doneChan -func (_m *MockBlobClient) StreamBlobs(ctx context.Context, container string, prefix *string, errChan chan error, blobChan chan *azureblob.BlobResults, doneChan chan struct{}) { +func (_m *MockBlobClient) StreamBlobs(ctx context.Context, container string, prefix *string, errChan chan error, blobChan chan []*azureblob.BlobInfo, doneChan chan struct{}) { _m.Called(ctx, container, prefix, errChan, blobChan, doneChan) } @@ -215,15 +145,15 @@ type MockBlobClient_StreamBlobs_Call struct { // - container string // - prefix *string // - errChan chan error -// - blobChan chan *azureblob.BlobResults +// - blobChan chan []*azureblob.BlobInfo // - doneChan chan struct{} func (_e *MockBlobClient_Expecter) StreamBlobs(ctx interface{}, container interface{}, prefix interface{}, errChan interface{}, blobChan interface{}, doneChan interface{}) *MockBlobClient_StreamBlobs_Call { return &MockBlobClient_StreamBlobs_Call{Call: _e.mock.On("StreamBlobs", ctx, container, prefix, errChan, blobChan, doneChan)} } -func (_c *MockBlobClient_StreamBlobs_Call) Run(run func(ctx context.Context, container string, prefix *string, errChan chan error, blobChan chan *azureblob.BlobResults, doneChan chan struct{})) *MockBlobClient_StreamBlobs_Call { +func (_c *MockBlobClient_StreamBlobs_Call) Run(run func(ctx context.Context, container string, prefix *string, errChan chan error, blobChan chan []*azureblob.BlobInfo, doneChan chan struct{})) *MockBlobClient_StreamBlobs_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(*string), args[3].(chan error), args[4].(chan *azureblob.BlobResults), args[5].(chan struct{})) + run(args[0].(context.Context), args[1].(string), args[2].(*string), args[3].(chan error), args[4].(chan []*azureblob.BlobInfo), args[5].(chan struct{})) }) return _c } @@ -233,8 +163,8 @@ func (_c *MockBlobClient_StreamBlobs_Call) Return() *MockBlobClient_StreamBlobs_ return _c } -func (_c *MockBlobClient_StreamBlobs_Call) RunAndReturn(run func(context.Context, string, *string, chan error, chan *azureblob.BlobResults, chan struct{})) *MockBlobClient_StreamBlobs_Call { - _c.Call.Return(run) +func (_c *MockBlobClient_StreamBlobs_Call) RunAndReturn(run func(context.Context, string, *string, chan error, chan []*azureblob.BlobInfo, chan struct{})) *MockBlobClient_StreamBlobs_Call { + _c.Run(run) return _c } diff --git a/receiver/azureblobrehydrationreceiver/receiver.go b/receiver/azureblobrehydrationreceiver/receiver.go index 980d168a2..cc26e4ca7 100644 --- a/receiver/azureblobrehydrationreceiver/receiver.go +++ b/receiver/azureblobrehydrationreceiver/receiver.go @@ -45,7 +45,7 @@ type rehydrationReceiver struct { checkpoint *rehydration.CheckPoint checkpointStore rehydration.CheckpointStorer - blobChan chan *azureblob.BlobResults + blobChan chan []*azureblob.BlobInfo errChan chan error doneChan chan struct{} @@ -101,6 +101,10 @@ func newTracesReceiver(id component.ID, logger *zap.Logger, cfg *Config, nextCon return r, nil } +// factor of buffered channel size +// number of blobs to process at a time is blobChanSize * batchSize +const blobChanSize = 5 + // newRehydrationReceiver creates a new rehydration receiver func newRehydrationReceiver(id component.ID, logger *zap.Logger, cfg *Config) (*rehydrationReceiver, error) { azureClient, err := newAzureBlobClient(cfg.ConnectionString, cfg.BatchSize, cfg.PageSize) @@ -128,7 +132,7 @@ func newRehydrationReceiver(id component.ID, logger *zap.Logger, cfg *Config) (* checkpointStore: rehydration.NewNopStorage(), startingTime: startingTime, endingTime: endingTime, - blobChan: make(chan *azureblob.BlobResults), + blobChan: make(chan []*azureblob.BlobInfo, blobChanSize), errChan: make(chan error), doneChan: make(chan struct{}), mut: &sync.Mutex{}, @@ -168,6 +172,9 @@ func (r *rehydrationReceiver) logAnyDeprecationWarnings() { // Shutdown shuts down the rehydration receiver func (r *rehydrationReceiver) Shutdown(ctx context.Context) error { var err error + // we can wait for the wg to finish as we know that all the in-flight blobs are done processing + r.wg.Wait() + if r.cancelFunc != nil { r.cancelFunc() } @@ -175,6 +182,7 @@ func (r *rehydrationReceiver) Shutdown(ctx context.Context) error { r.logger.Error("Error while saving checkpoint", zap.Error(err)) err = errors.Join(err, err) } + r.logger.Info("last blob that was processed", zap.String("blob", r.lastBlob.Name), zap.Time("time", *r.lastBlobTime)) err = errors.Join(err, r.checkpointStore.Close(ctx)) return err } @@ -210,17 +218,12 @@ func (r *rehydrationReceiver) streamRehydrateBlobs(ctx context.Context) { r.logger.Error("Error streaming blobs, stopping rehydration", zap.Error(err), zap.Int("durationSeconds", int(time.Since(startTime).Seconds()))) return case br := <-r.blobChan: - numProcessedBlobs := r.rehydrateBlobs(ctx, br.Blobs) + numProcessedBlobs := r.rehydrateBlobs(ctx, br) if numProcessedBlobs != 0 { emptyPolls = 0 continue } - emptyPolls++ - if emptyPolls == emptyPollLimit { - r.logger.Warn("No blobs processed for 3 consecutive polls, assuming no more blobs to process") - return - } } } } @@ -262,7 +265,6 @@ func (r *rehydrationReceiver) rehydrateBlobs(ctx context.Context, blobs []*azure return default: } - // Process and consume the blob at the given path if err := r.processBlob(ctx, blob); err != nil { // If the error is because the context was canceled, then we don't want to log it diff --git a/receiver/azureblobrehydrationreceiver/receiver_test.go b/receiver/azureblobrehydrationreceiver/receiver_test.go index 805f5680a..ba7371332 100644 --- a/receiver/azureblobrehydrationreceiver/receiver_test.go +++ b/receiver/azureblobrehydrationreceiver/receiver_test.go @@ -145,9 +145,7 @@ func Test_fullRehydration(t *testing.T) { require.NoError(t, err) mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { - r.blobChan <- &azureblob.BlobResults{ - Blobs: returnedBlobInfo, - } + r.blobChan <- returnedBlobInfo close(r.doneChan) }) @@ -190,9 +188,7 @@ func Test_fullRehydration(t *testing.T) { // Setup mocks mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { - r.blobChan <- &azureblob.BlobResults{ - Blobs: returnedBlobInfo, - } + r.blobChan <- returnedBlobInfo close(r.doneChan) }) mockClient.EXPECT().DownloadBlob(mock.Anything, cfg.Container, targetBlob.Name, mock.Anything).RunAndReturn(func(_ context.Context, _ string, _ string, buf []byte) (int64, error) { @@ -234,9 +230,7 @@ func Test_fullRehydration(t *testing.T) { require.NoError(t, err) mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { - r.blobChan <- &azureblob.BlobResults{ - Blobs: returnedBlobInfo, - } + r.blobChan <- returnedBlobInfo close(r.doneChan) }) mockClient.EXPECT().DownloadBlob(mock.Anything, cfg.Container, targetBlob.Name, mock.Anything).RunAndReturn(func(_ context.Context, _ string, _ string, buf []byte) (int64, error) { @@ -281,9 +275,7 @@ func Test_fullRehydration(t *testing.T) { require.NoError(t, err) mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { - r.blobChan <- &azureblob.BlobResults{ - Blobs: returnedBlobInfo, - } + r.blobChan <- returnedBlobInfo close(r.doneChan) }) mockClient.EXPECT().DownloadBlob(mock.Anything, cfg.Container, targetBlob.Name, mock.Anything).RunAndReturn(func(_ context.Context, _ string, _ string, buf []byte) (int64, error) { @@ -334,9 +326,7 @@ func Test_fullRehydration(t *testing.T) { require.NoError(t, err) mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { - r.blobChan <- &azureblob.BlobResults{ - Blobs: returnedBlobInfo, - } + r.blobChan <- returnedBlobInfo close(r.doneChan) }) mockClient.EXPECT().DownloadBlob(mock.Anything, cfg.Container, targetBlob.Name, mock.Anything).RunAndReturn(func(_ context.Context, _ string, _ string, buf []byte) (int64, error) { @@ -389,9 +379,7 @@ func Test_fullRehydration(t *testing.T) { require.NoError(t, err) mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { - r.blobChan <- &azureblob.BlobResults{ - Blobs: returnedBlobInfo, - } + r.blobChan <- returnedBlobInfo close(r.doneChan) }) mockClient.EXPECT().DownloadBlob(mock.Anything, cfg.Container, targetBlob.Name, mock.Anything).RunAndReturn(func(_ context.Context, _ string, _ string, buf []byte) (int64, error) { @@ -535,7 +523,7 @@ func TestLogsDeprecationWarnings(t *testing.T) { PollTimeout: 1 * time.Second, }, azureClient: mockClient, - blobChan: make(chan *azureblob.BlobResults), + blobChan: make(chan []*azureblob.BlobInfo), errChan: make(chan error), doneChan: make(chan struct{}), mut: &sync.Mutex{}, From 43ab839fe257352360bf51c292280689188c24c7 Mon Sep 17 00:00:00 2001 From: schmikei Date: Wed, 15 Jan 2025 17:49:01 -0500 Subject: [PATCH 11/17] remove 3 empty request limit --- receiver/azureblobrehydrationreceiver/receiver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/azureblobrehydrationreceiver/receiver.go b/receiver/azureblobrehydrationreceiver/receiver.go index cc26e4ca7..ebdeda279 100644 --- a/receiver/azureblobrehydrationreceiver/receiver.go +++ b/receiver/azureblobrehydrationreceiver/receiver.go @@ -237,7 +237,7 @@ func (r *rehydrationReceiver) rehydrateBlobs(ctx context.Context, blobs []*azure for _, blob := range blobs { select { case <-ctx.Done(): - return int(processedBlobCount.Load()) + break default: } From 80062799d5ad2173afd57ffc57737aff66b45910 Mon Sep 17 00:00:00 2001 From: schmikei Date: Wed, 15 Jan 2025 17:50:37 -0500 Subject: [PATCH 12/17] remove testing log line --- receiver/azureblobrehydrationreceiver/receiver.go | 1 - 1 file changed, 1 deletion(-) diff --git a/receiver/azureblobrehydrationreceiver/receiver.go b/receiver/azureblobrehydrationreceiver/receiver.go index ebdeda279..17c2bac6f 100644 --- a/receiver/azureblobrehydrationreceiver/receiver.go +++ b/receiver/azureblobrehydrationreceiver/receiver.go @@ -182,7 +182,6 @@ func (r *rehydrationReceiver) Shutdown(ctx context.Context) error { r.logger.Error("Error while saving checkpoint", zap.Error(err)) err = errors.Join(err, err) } - r.logger.Info("last blob that was processed", zap.String("blob", r.lastBlob.Name), zap.Time("time", *r.lastBlobTime)) err = errors.Join(err, r.checkpointStore.Close(ctx)) return err } From 0f17cba4355a8bf540038d2d0c1858efe3efd78b Mon Sep 17 00:00:00 2001 From: schmikei Date: Thu, 16 Jan 2025 11:32:17 -0500 Subject: [PATCH 13/17] harden shutdown logic to close channel with a timeout --- .../azureblobrehydrationreceiver/receiver.go | 62 +++++++++++++------ .../receiver_test.go | 6 -- 2 files changed, 42 insertions(+), 26 deletions(-) diff --git a/receiver/azureblobrehydrationreceiver/receiver.go b/receiver/azureblobrehydrationreceiver/receiver.go index 17c2bac6f..06c3e8a92 100644 --- a/receiver/azureblobrehydrationreceiver/receiver.go +++ b/receiver/azureblobrehydrationreceiver/receiver.go @@ -171,19 +171,40 @@ func (r *rehydrationReceiver) logAnyDeprecationWarnings() { // Shutdown shuts down the rehydration receiver func (r *rehydrationReceiver) Shutdown(ctx context.Context) error { - var err error - // we can wait for the wg to finish as we know that all the in-flight blobs are done processing - r.wg.Wait() - if r.cancelFunc != nil { r.cancelFunc() } - if err := r.makeCheckpoint(ctx); err != nil { + + shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + var errs error + + // signal shutdown intent + select { + case r.doneChan <- struct{}{}: + default: + } + + // wait for any in-progress operations to finish + done := make(chan struct{}) + go func() { + r.wg.Wait() + close(done) + }() + + select { + case <-done: + case <-shutdownCtx.Done(): + return fmt.Errorf("shutdown timeout: %w", shutdownCtx.Err()) + } + + if err := r.makeCheckpoint(shutdownCtx); err != nil { r.logger.Error("Error while saving checkpoint", zap.Error(err)) err = errors.Join(err, err) } - err = errors.Join(err, r.checkpointStore.Close(ctx)) - return err + + return errors.Join(errs, r.checkpointStore.Close(shutdownCtx)) } func (r *rehydrationReceiver) streamRehydrateBlobs(ctx context.Context) { @@ -204,8 +225,6 @@ func (r *rehydrationReceiver) streamRehydrateBlobs(ctx context.Context) { go r.azureClient.StreamBlobs(ctx, r.cfg.Container, prefix, r.errChan, r.blobChan, r.doneChan) - emptyPolls := 0 - for { select { case <-ctx.Done(): @@ -216,13 +235,12 @@ func (r *rehydrationReceiver) streamRehydrateBlobs(ctx context.Context) { case err := <-r.errChan: r.logger.Error("Error streaming blobs, stopping rehydration", zap.Error(err), zap.Int("durationSeconds", int(time.Since(startTime).Seconds()))) return - case br := <-r.blobChan: - numProcessedBlobs := r.rehydrateBlobs(ctx, br) - if numProcessedBlobs != 0 { - emptyPolls = 0 - continue + case br, ok := <-r.blobChan: + if !ok { + return } - emptyPolls++ + numProcessedBlobs := r.rehydrateBlobs(ctx, br) + r.logger.Debug("Processed a number of blobs", zap.Int("num_processed_blobs", numProcessedBlobs)) } } } @@ -253,9 +271,6 @@ func (r *rehydrationReceiver) rehydrateBlobs(ctx context.Context, blobs []*azure continue } - r.lastBlob = blob - r.lastBlobTime = blobTime - r.wg.Add(1) go func() { defer r.wg.Done() @@ -278,16 +293,23 @@ func (r *rehydrationReceiver) rehydrateBlobs(ctx context.Context, blobs []*azure if err := r.conditionallyDeleteBlob(ctx, blob); err != nil { r.logger.Error("Error while attempting to delete blob", zap.String("blob", blob.Name), zap.Error(err)) } + + if r.lastBlobTime == nil || r.lastBlobTime.Before(*blobTime) { + r.mut.Lock() + r.lastBlob = blob + r.lastBlobTime = blobTime + r.mut.Unlock() + } }() } } + r.wg.Wait() + if err := r.makeCheckpoint(ctx); err != nil { r.logger.Error("Error while saving checkpoint", zap.Error(err)) } - r.wg.Wait() - return int(processedBlobCount.Load()) } diff --git a/receiver/azureblobrehydrationreceiver/receiver_test.go b/receiver/azureblobrehydrationreceiver/receiver_test.go index ba7371332..3a7af9e6b 100644 --- a/receiver/azureblobrehydrationreceiver/receiver_test.go +++ b/receiver/azureblobrehydrationreceiver/receiver_test.go @@ -146,7 +146,6 @@ func Test_fullRehydration(t *testing.T) { mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { r.blobChan <- returnedBlobInfo - close(r.doneChan) }) mockClient.EXPECT().DownloadBlob(mock.Anything, cfg.Container, targetBlob.Name, mock.Anything).RunAndReturn(func(_ context.Context, _ string, _ string, buf []byte) (int64, error) { @@ -189,7 +188,6 @@ func Test_fullRehydration(t *testing.T) { // Setup mocks mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { r.blobChan <- returnedBlobInfo - close(r.doneChan) }) mockClient.EXPECT().DownloadBlob(mock.Anything, cfg.Container, targetBlob.Name, mock.Anything).RunAndReturn(func(_ context.Context, _ string, _ string, buf []byte) (int64, error) { require.Len(t, buf, int(expectedBuffSize)) @@ -231,7 +229,6 @@ func Test_fullRehydration(t *testing.T) { mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { r.blobChan <- returnedBlobInfo - close(r.doneChan) }) mockClient.EXPECT().DownloadBlob(mock.Anything, cfg.Container, targetBlob.Name, mock.Anything).RunAndReturn(func(_ context.Context, _ string, _ string, buf []byte) (int64, error) { require.Len(t, buf, int(expectedBuffSize)) @@ -276,7 +273,6 @@ func Test_fullRehydration(t *testing.T) { mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { r.blobChan <- returnedBlobInfo - close(r.doneChan) }) mockClient.EXPECT().DownloadBlob(mock.Anything, cfg.Container, targetBlob.Name, mock.Anything).RunAndReturn(func(_ context.Context, _ string, _ string, buf []byte) (int64, error) { require.Len(t, buf, int(expectedBuffSize)) @@ -327,7 +323,6 @@ func Test_fullRehydration(t *testing.T) { mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { r.blobChan <- returnedBlobInfo - close(r.doneChan) }) mockClient.EXPECT().DownloadBlob(mock.Anything, cfg.Container, targetBlob.Name, mock.Anything).RunAndReturn(func(_ context.Context, _ string, _ string, buf []byte) (int64, error) { require.Len(t, buf, int(expectedBuffSize)) @@ -380,7 +375,6 @@ func Test_fullRehydration(t *testing.T) { mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { r.blobChan <- returnedBlobInfo - close(r.doneChan) }) mockClient.EXPECT().DownloadBlob(mock.Anything, cfg.Container, targetBlob.Name, mock.Anything).RunAndReturn(func(_ context.Context, _ string, _ string, buf []byte) (int64, error) { require.Len(t, buf, int(expectedBuffSize)) From 4bac0259e7a99cdffddafa0fbb52dc63dd08754d Mon Sep 17 00:00:00 2001 From: schmikei Date: Thu, 16 Jan 2025 11:36:23 -0500 Subject: [PATCH 14/17] fix lint --- receiver/azureblobrehydrationreceiver/receiver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/azureblobrehydrationreceiver/receiver.go b/receiver/azureblobrehydrationreceiver/receiver.go index 06c3e8a92..54461fa77 100644 --- a/receiver/azureblobrehydrationreceiver/receiver.go +++ b/receiver/azureblobrehydrationreceiver/receiver.go @@ -175,7 +175,7 @@ func (r *rehydrationReceiver) Shutdown(ctx context.Context) error { r.cancelFunc() } - shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + shutdownCtx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() var errs error From 537270ee66d87edb32761cd1e2a56b225045e143 Mon Sep 17 00:00:00 2001 From: schmikei Date: Thu, 16 Jan 2025 15:11:58 -0500 Subject: [PATCH 15/17] dakota PR feedback --- .../azureblobrehydrationreceiver/README.md | 4 +- .../internal/azureblob/blob_client.go | 51 ++++++++++--------- .../azureblobrehydrationreceiver/receiver.go | 12 ++--- 3 files changed, 34 insertions(+), 33 deletions(-) diff --git a/receiver/azureblobrehydrationreceiver/README.md b/receiver/azureblobrehydrationreceiver/README.md index eeda7c234..e6a35f306 100644 --- a/receiver/azureblobrehydrationreceiver/README.md +++ b/receiver/azureblobrehydrationreceiver/README.md @@ -13,13 +13,15 @@ This is not a traditional receiver that continually produces data but rather reh - Traces ## How it works -1. The receiver polls blob storage for pages of blobs in the specified container. There is no current way of specifying a time range to rehydrate so any blobs outside fo the time range still need to be retrieved from the API in order to filter via the `starting_time` and `ending_time` configuration. +1. The receiver polls blob storage for pages of blobs in the specified container. 2. The receiver will parse each blob's path to determine if it matches a path created by the [Azure Blob Exporter](../../exporter/azureblobexporter/README.md#blob-path). 3. If the blob path is from the exporter, the receiver will parse the timestamp represented by the path. 4. If the timestamp is within the configured range the receiver will download the blob and parse its contents into OTLP data. a. The receiver will process both uncompressed JSON blobs and blobs compressed with gzip. +> Note: There is no current way of specifying a time range to rehydrate so any blobs outside fo the time range still need to be retrieved from the API in order to filter via the `starting_time` and `ending_time` configuration. + ## Configuration | Field | Type | Default | Required | Description | diff --git a/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go b/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go index f3fb2201e..ae6498eb2 100644 --- a/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go +++ b/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go @@ -89,35 +89,36 @@ func (a *AzureClient) StreamBlobs(ctx context.Context, container string, prefix case <-ctx.Done(): return default: - resp, err := pager.NextPage(ctx) - if err != nil { - errChan <- fmt.Errorf("error streaming blobs: %w", err) - return - } + } + + resp, err := pager.NextPage(ctx) + if err != nil { + errChan <- fmt.Errorf("error streaming blobs: %w", err) + return + } - batch := []*BlobInfo{} - for _, blob := range resp.Segment.BlobItems { - if blob.Deleted != nil && *blob.Deleted { - continue - } - if blob.Name == nil || blob.Properties == nil || blob.Properties.ContentLength == nil { - continue - } - - info := &BlobInfo{ - Name: *blob.Name, - Size: *blob.Properties.ContentLength, - } - batch = append(batch, info) - if len(batch) == int(a.batchSize) { - blobChan <- batch - batch = []*BlobInfo{} - } + batch := []*BlobInfo{} + for _, blob := range resp.Segment.BlobItems { + if blob.Deleted != nil && *blob.Deleted { + continue + } + if blob.Name == nil || blob.Properties == nil || blob.Properties.ContentLength == nil { + continue } - blobChan <- batch - marker = resp.NextMarker + info := &BlobInfo{ + Name: *blob.Name, + Size: *blob.Properties.ContentLength, + } + batch = append(batch, info) + if len(batch) == int(a.batchSize) { + blobChan <- batch + batch = []*BlobInfo{} + } } + + blobChan <- batch + marker = resp.NextMarker } close(doneChan) diff --git a/receiver/azureblobrehydrationreceiver/receiver.go b/receiver/azureblobrehydrationreceiver/receiver.go index 54461fa77..bfef10a95 100644 --- a/receiver/azureblobrehydrationreceiver/receiver.go +++ b/receiver/azureblobrehydrationreceiver/receiver.go @@ -178,13 +178,8 @@ func (r *rehydrationReceiver) Shutdown(ctx context.Context) error { shutdownCtx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - var errs error - // signal shutdown intent - select { - case r.doneChan <- struct{}{}: - default: - } + close(r.doneChan) // wait for any in-progress operations to finish done := make(chan struct{}) @@ -199,6 +194,7 @@ func (r *rehydrationReceiver) Shutdown(ctx context.Context) error { return fmt.Errorf("shutdown timeout: %w", shutdownCtx.Err()) } + var errs error if err := r.makeCheckpoint(shutdownCtx); err != nil { r.logger.Error("Error while saving checkpoint", zap.Error(err)) err = errors.Join(err, err) @@ -228,6 +224,7 @@ func (r *rehydrationReceiver) streamRehydrateBlobs(ctx context.Context) { for { select { case <-ctx.Done(): + r.logger.Info("Context cancelled, stopping rehydration", zap.Int("durationSeconds", int(time.Since(startTime).Seconds()))) return case <-r.doneChan: r.logger.Info("Finished rehydrating blobs", zap.Int("durationSeconds", int(time.Since(startTime).Seconds()))) @@ -237,6 +234,7 @@ func (r *rehydrationReceiver) streamRehydrateBlobs(ctx context.Context) { return case br, ok := <-r.blobChan: if !ok { + r.logger.Info("Finished rehydrating blobs", zap.Int("durationSeconds", int(time.Since(startTime).Seconds()))) return } numProcessedBlobs := r.rehydrateBlobs(ctx, br) @@ -249,7 +247,7 @@ const emptyPollLimit = 5 func (r *rehydrationReceiver) rehydrateBlobs(ctx context.Context, blobs []*azureblob.BlobInfo) (numProcessedBlobs int) { // Go through each blob and parse it's path to determine if we should consume it or not - r.logger.Debug("parsing through blobs", zap.Int("num_blobs", len(blobs))) + r.logger.Debug("received a batch of blobs, parsing through them to determine if they should be rehydrated", zap.Int("num_blobs", len(blobs))) processedBlobCount := atomic.Int64{} for _, blob := range blobs { select { From 2ed65a8a7a7a069480580fcb9ed638a6896b39cc Mon Sep 17 00:00:00 2001 From: schmikei Date: Thu, 16 Jan 2025 15:49:14 -0500 Subject: [PATCH 16/17] more feedback; sans checkpointing after every blob --- .../internal/azureblob/blob_client.go | 1 - receiver/azureblobrehydrationreceiver/receiver.go | 10 ++++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go b/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go index ae6498eb2..825e84a8a 100644 --- a/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go +++ b/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go @@ -118,7 +118,6 @@ func (a *AzureClient) StreamBlobs(ctx context.Context, container string, prefix } blobChan <- batch - marker = resp.NextMarker } close(doneChan) diff --git a/receiver/azureblobrehydrationreceiver/receiver.go b/receiver/azureblobrehydrationreceiver/receiver.go index bfef10a95..eb18b1f6e 100644 --- a/receiver/azureblobrehydrationreceiver/receiver.go +++ b/receiver/azureblobrehydrationreceiver/receiver.go @@ -197,10 +197,14 @@ func (r *rehydrationReceiver) Shutdown(ctx context.Context) error { var errs error if err := r.makeCheckpoint(shutdownCtx); err != nil { r.logger.Error("Error while saving checkpoint", zap.Error(err)) - err = errors.Join(err, err) + errs = errors.Join(errs, fmt.Errorf("error while saving checkpoint: %w", err)) } - return errors.Join(errs, r.checkpointStore.Close(shutdownCtx)) + if err := r.checkpointStore.Close(shutdownCtx); err != nil { + errs = errors.Join(errs, fmt.Errorf("error while closing checkpoint store: %w", err)) + } + + return errs } func (r *rehydrationReceiver) streamRehydrateBlobs(ctx context.Context) { @@ -243,8 +247,6 @@ func (r *rehydrationReceiver) streamRehydrateBlobs(ctx context.Context) { } } -const emptyPollLimit = 5 - func (r *rehydrationReceiver) rehydrateBlobs(ctx context.Context, blobs []*azureblob.BlobInfo) (numProcessedBlobs int) { // Go through each blob and parse it's path to determine if we should consume it or not r.logger.Debug("received a batch of blobs, parsing through them to determine if they should be rehydrated", zap.Int("num_blobs", len(blobs))) From 4e57ff6ba80c26d8186b8948cf9ed4d23e2e5f5d Mon Sep 17 00:00:00 2001 From: schmikei Date: Fri, 17 Jan 2025 09:26:14 -0500 Subject: [PATCH 17/17] minor updates; pr review --- receiver/azureblobrehydrationreceiver/README.md | 2 +- receiver/azureblobrehydrationreceiver/receiver.go | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/receiver/azureblobrehydrationreceiver/README.md b/receiver/azureblobrehydrationreceiver/README.md index e6a35f306..0ae988300 100644 --- a/receiver/azureblobrehydrationreceiver/README.md +++ b/receiver/azureblobrehydrationreceiver/README.md @@ -20,7 +20,7 @@ This is not a traditional receiver that continually produces data but rather reh a. The receiver will process both uncompressed JSON blobs and blobs compressed with gzip. -> Note: There is no current way of specifying a time range to rehydrate so any blobs outside fo the time range still need to be retrieved from the API in order to filter via the `starting_time` and `ending_time` configuration. +> Note: There is no current way of specifying a time range to rehydrate so any blobs outside of the time range still need to be retrieved from the API in order to filter via the `starting_time` and `ending_time` configuration. ## Configuration diff --git a/receiver/azureblobrehydrationreceiver/receiver.go b/receiver/azureblobrehydrationreceiver/receiver.go index eb18b1f6e..b1b323b46 100644 --- a/receiver/azureblobrehydrationreceiver/receiver.go +++ b/receiver/azureblobrehydrationreceiver/receiver.go @@ -196,7 +196,6 @@ func (r *rehydrationReceiver) Shutdown(ctx context.Context) error { var errs error if err := r.makeCheckpoint(shutdownCtx); err != nil { - r.logger.Error("Error while saving checkpoint", zap.Error(err)) errs = errors.Join(errs, fmt.Errorf("error while saving checkpoint: %w", err)) } @@ -249,7 +248,7 @@ func (r *rehydrationReceiver) streamRehydrateBlobs(ctx context.Context) { func (r *rehydrationReceiver) rehydrateBlobs(ctx context.Context, blobs []*azureblob.BlobInfo) (numProcessedBlobs int) { // Go through each blob and parse it's path to determine if we should consume it or not - r.logger.Debug("received a batch of blobs, parsing through them to determine if they should be rehydrated", zap.Int("num_blobs", len(blobs))) + r.logger.Debug("Received a batch of blobs, parsing through them to determine if they should be rehydrated", zap.Int("num_blobs", len(blobs))) processedBlobCount := atomic.Int64{} for _, blob := range blobs { select {