diff --git a/receiver/azureblobrehydrationreceiver/README.md b/receiver/azureblobrehydrationreceiver/README.md index 906b9ecb9..0ae988300 100644 --- a/receiver/azureblobrehydrationreceiver/README.md +++ b/receiver/azureblobrehydrationreceiver/README.md @@ -13,25 +13,32 @@ This is not a traditional receiver that continually produces data but rather reh - Traces ## How it works -1. The receiver polls blob storage for all blobs in the specified container. +1. The receiver polls blob storage for pages of blobs in the specified container. 2. The receiver will parse each blob's path to determine if it matches a path created by the [Azure Blob Exporter](../../exporter/azureblobexporter/README.md#blob-path). 3. If the blob path is from the exporter, the receiver will parse the timestamp represented by the path. 4. If the timestamp is within the configured range the receiver will download the blob and parse its contents into OTLP data. a. The receiver will process both uncompressed JSON blobs and blobs compressed with gzip. +> Note: There is no current way of specifying a time range to rehydrate so any blobs outside of the time range still need to be retrieved from the API in order to filter via the `starting_time` and `ending_time` configuration. + ## Configuration -| Field | Type | Default | Required | Description | -|--------------------|-----------|------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| connection_string | string | | `true` | The connection string to the Azure Blob Storage account. Can be found under the `Access keys` section of your storage account. | -| container | string | | `true` | The name of the container to rehydrate from. | -| root_folder | string | | `false` | The root folder that prefixes the blob path. Should match the `root_folder` value of the Azure Blob Exporter. | -| starting_time | string | | `true ` | The UTC start time that represents the start of the time range to rehydrate from. Must be in the form `YYYY-MM-DDTHH:MM`. | -| ending_time | string | | `true ` | The UTC end time that represents the end of the time range to rehydrate from. Must be in the form `YYYY-MM-DDTHH:MM`. | -| delete_on_read | bool | `false` | `false ` | If `true` the blob will be deleted after being rehydrated. | -| poll_interval | string | `1m` | `false ` | How often to read a new set of blobs. This value is mostly to control how often the blob API is called to ensure once rehydration is done the receiver isn't making too many API calls. | -| poll_timeout | string | `30s` | `false ` | The timeout used when reading blobs from Azure. | -| storage | string | | `false ` | The component ID of a storage extension. The storage extension prevents duplication of data after a collector restart by remembering which blobs were previously rehydrated. | + +| Field | Type | Default | Required | Description | +|-------|------|---------|----------|-------------| +| connection_string | string | | `true` | The connection string to the Azure Blob Storage account. Can be found under the `Access keys` section of your storage account. | +| container | string | | `true` | The name of the container to rehydrate from. | +| root_folder | string | | `false` | The root folder that prefixes the blob path. Should match the `root_folder` value of the Azure Blob Exporter. | +| starting_time | string | | `true` | The UTC start time that represents the start of the time range to rehydrate from. Must be in the form `YYYY-MM-DDTHH:MM`. | +| ending_time | string | | `true` | The UTC end time that represents the end of the time range to rehydrate from. Must be in the form `YYYY-MM-DDTHH:MM`. | +| delete_on_read | bool | `false` | `false` | If `true` the blob will be deleted after being rehydrated. | +| storage | string | | `false` | The component ID of a storage extension. The storage extension prevents duplication of data after a collector restart by remembering which blobs were previously rehydrated. | +| poll_interval* | string | | `false` | The interval at which the Azure API is scanned for blobs. | +| poll_timeout* | string | | `false` | The timeout for the Azure API to scan for blobs. | +| batch_size | int | `30` | `false` | The number of blobs to download and process in the pipeline simultaneously. This parameter directly impacts performance by controlling the concurrent blob download limit. | +| page_size | int | `1000` | `false` | The maximum number of blob information to request in a single API call. | + +> Deprecated*: `poll_interval` and `poll_timeout` are no longer supported and `batch_size`/`page_size` should be used instead. ## Example Configuration @@ -41,6 +48,7 @@ This configuration specifies a `connection_string`, `container`, `starting_time` This will rehydrate all blobs in the container `my-container` that have a path that represents they were created between `1:00pm` and `2:30pm` UTC time on `October 1, 2023`. Such a path could look like the following: + ``` year=2023/month=10/day=01/hour=13/minute=30/metrics_12345.json year=2023/month=10/day=01/hour=13/minute=30/logs_12345.json @@ -53,10 +61,10 @@ azureblobrehydration: container: "my-container" starting_time: 2023-10-01T13:00 ending_time: 2023-10-01T14:30 + batch_size: 100 + page_size: 1000 ``` - ### Using Storage Extension Configuration - This configuration shows using a storage extension to track rehydration progress over agent restarts. The `storage` field is set to the component ID of the storage extension. @@ -64,7 +72,6 @@ This configuration shows using a storage extension to track rehydration progress extensions: file_storage: directory: $OIQ_OTEL_COLLECTOR_HOME/storage - receivers: azureblobrehydration: connection_string: "DefaultEndpointsProtocol=https;AccountName=storage_account_name;AccountKey=storage_account_key;EndpointSuffix=core.windows.net" @@ -72,6 +79,8 @@ receivers: starting_time: 2023-10-01T13:00 ending_time: 2023-10-01T14:30 storage: "file_storage" + batch_size: 100 + page_size: 1000 ``` ### Root Folder Configuration @@ -93,6 +102,8 @@ azureblobrehydration: starting_time: 2023-10-01T13:00 ending_time: 2023-10-01T14:30 root_folder: "root" + batch_size: 100 + page_size: 1000 ``` ### Delete on read Configuration @@ -106,4 +117,15 @@ azureblobrehydration: starting_time: 2023-10-01T13:00 ending_time: 2023-10-01T14:30 delete_on_read: true + batch_size: 100 + page_size: 1000 ``` + +## Deprecated Configuration + +The following configuration fields are deprecated and will be removed in a future release. + +| Field | Deprecated | +|-------|----------| +| poll_interval | true | +| poll_timeout | true | diff --git a/receiver/azureblobrehydrationreceiver/config.go b/receiver/azureblobrehydrationreceiver/config.go index 1d69e5a57..8c934ecb6 100644 --- a/receiver/azureblobrehydrationreceiver/config.go +++ b/receiver/azureblobrehydrationreceiver/config.go @@ -25,6 +25,10 @@ import ( // Config is the configuration for the azure blob rehydration receiver type Config struct { + // BatchSize is the number of blobs to process entering the pipeline in a single batch. (default 30) + // This number directly affects the number of goroutines that will be created to process the blobs. + BatchSize int `mapstructure:"batch_size"` + // ConnectionString is the Azure Blob Storage connection key, // which can be found in the Azure Blob Storage resource on the Azure Portal. (no default) ConnectionString string `mapstructure:"connection_string"` @@ -45,19 +49,30 @@ type Config struct { // Default value of false DeleteOnRead bool `mapstructure:"delete_on_read"` + // Deprecated: PollInterval is no longer required due to streaming blobs for processing. + // if a value is provided, validation will throw an error // PollInterval is the interval at which the Azure API is scanned for blobs. // Default value of 1m PollInterval time.Duration `mapstructure:"poll_interval"` + // Deprecated: PollTimeout is no longer required due to streaming blobs for processing. + // if a value is provided, validation will throw an error // PollTimeout is the timeout for the Azure API to scan for blobs. PollTimeout time.Duration `mapstructure:"poll_timeout"` + // PageSize is the number of blobs to request from the Azure API at a time. (default 1000) + PageSize int `mapstructure:"page_size"` + // ID of the storage extension to use for storing progress StorageID *component.ID `mapstructure:"storage"` } // Validate validates the config func (c *Config) Validate() error { + if c.BatchSize < 1 { + return errors.New("batch_size must be greater than 0") + } + if c.ConnectionString == "" { return errors.New("connection_string is required") } @@ -81,12 +96,8 @@ func (c *Config) Validate() error { return errors.New("ending_time must be at least one minute after starting_time") } - if c.PollInterval < time.Second { - return errors.New("poll_interval must be at least one second") - } - - if c.PollTimeout < time.Second { - return errors.New("poll_timeout must be at least one second") + if c.PageSize < 1 { + return errors.New("page_size must be greater than 0") } return nil diff --git a/receiver/azureblobrehydrationreceiver/config_test.go b/receiver/azureblobrehydrationreceiver/config_test.go index 8ddb34ef9..9a4adb5c0 100644 --- a/receiver/azureblobrehydrationreceiver/config_test.go +++ b/receiver/azureblobrehydrationreceiver/config_test.go @@ -17,7 +17,6 @@ package azureblobrehydrationreceiver //import "github.com/observiq/bindplane-ote import ( "errors" "testing" - "time" "github.com/stretchr/testify/require" ) @@ -37,8 +36,8 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "2023-10-02T17:01", DeleteOnRead: false, - PollInterval: time.Second, - PollTimeout: time.Second * 10, + BatchSize: 30, + PageSize: 1000, }, expectErr: errors.New("connection_string is required"), }, @@ -51,8 +50,8 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "2023-10-02T17:01", DeleteOnRead: false, - PollInterval: time.Second, - PollTimeout: time.Second * 10, + BatchSize: 30, + PageSize: 1000, }, expectErr: errors.New("container is required"), }, @@ -65,8 +64,8 @@ func TestConfigValidate(t *testing.T) { StartingTime: "", EndingTime: "2023-10-02T17:01", DeleteOnRead: false, - PollInterval: time.Second, - PollTimeout: time.Second * 10, + BatchSize: 30, + PageSize: 1000, }, expectErr: errors.New("starting_time is invalid: missing value"), }, @@ -79,8 +78,8 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "", DeleteOnRead: false, - PollInterval: time.Second, - PollTimeout: time.Second * 10, + BatchSize: 30, + PageSize: 1000, }, expectErr: errors.New("ending_time is invalid: missing value"), }, @@ -93,8 +92,8 @@ func TestConfigValidate(t *testing.T) { StartingTime: "invalid_time", EndingTime: "2023-10-02T17:01", DeleteOnRead: false, - PollInterval: time.Second, - PollTimeout: time.Second * 10, + BatchSize: 30, + PageSize: 1000, }, expectErr: errors.New("starting_time is invalid: invalid timestamp"), }, @@ -107,8 +106,8 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "invalid_time", DeleteOnRead: false, - PollInterval: time.Second, - PollTimeout: time.Second * 10, + BatchSize: 30, + PageSize: 1000, }, expectErr: errors.New("ending_time is invalid: invalid timestamp"), }, @@ -121,13 +120,13 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "2023-10-02T16:00", DeleteOnRead: false, - PollInterval: time.Second, - PollTimeout: time.Second * 10, + BatchSize: 30, + PageSize: 1000, }, expectErr: errors.New("ending_time must be at least one minute after starting_time"), }, { - desc: "Bad poll_interval", + desc: "with deprecated poll_interval", cfg: &Config{ ConnectionString: "connection_string", Container: "container", @@ -135,13 +134,28 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "2023-10-02T17:01", DeleteOnRead: false, - PollInterval: time.Millisecond, - PollTimeout: time.Second * 10, + BatchSize: 30, + PageSize: 1000, }, - expectErr: errors.New("poll_interval must be at least one second"), + // expect no error until future release where poll_interval is removed + expectErr: nil, + }, + { + desc: "Bad batch_size", + cfg: &Config{ + ConnectionString: "connection_string", + Container: "container", + RootFolder: "root", + StartingTime: "2023-10-02T17:00", + EndingTime: "2023-10-02T17:01", + DeleteOnRead: false, + BatchSize: 0, + PageSize: 1000, + }, + expectErr: errors.New("batch_size must be greater than 0"), }, { - desc: "Bad poll_timeout", + desc: "Bad page_size", cfg: &Config{ ConnectionString: "connection_string", Container: "container", @@ -149,10 +163,10 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "2023-10-02T17:01", DeleteOnRead: false, - PollInterval: time.Second * 2, - PollTimeout: time.Millisecond, + BatchSize: 30, + PageSize: 0, }, - expectErr: errors.New("poll_timeout must be at least one second"), + expectErr: errors.New("page_size must be greater than 0"), }, { desc: "Valid config", @@ -163,8 +177,8 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "2023-10-02T17:01", DeleteOnRead: false, - PollInterval: time.Second, - PollTimeout: time.Second * 10, + BatchSize: 30, + PageSize: 1000, }, expectErr: nil, }, diff --git a/receiver/azureblobrehydrationreceiver/factory.go b/receiver/azureblobrehydrationreceiver/factory.go index 6465eb47f..cc87dd826 100644 --- a/receiver/azureblobrehydrationreceiver/factory.go +++ b/receiver/azureblobrehydrationreceiver/factory.go @@ -17,7 +17,6 @@ package azureblobrehydrationreceiver //import "github.com/observiq/bindplane-ote import ( "context" "errors" - "time" "github.com/observiq/bindplane-otel-collector/receiver/azureblobrehydrationreceiver/internal/metadata" "go.opentelemetry.io/collector/component" @@ -28,6 +27,9 @@ import ( // errImproperCfgType error for when an invalid config type is passed to receiver creation funcs var errImproperCfgType = errors.New("improper config type") +const defaultBatchSize = 30 +const defaultPageSize = 1000 + // NewFactory creates a new receiver factory func NewFactory() receiver.Factory { return receiver.NewFactory( @@ -43,8 +45,8 @@ func NewFactory() receiver.Factory { func createDefaultConfig() component.Config { return &Config{ DeleteOnRead: false, - PollInterval: time.Minute, - PollTimeout: time.Second * 30, + BatchSize: defaultBatchSize, + PageSize: defaultPageSize, } } diff --git a/receiver/azureblobrehydrationreceiver/factory_test.go b/receiver/azureblobrehydrationreceiver/factory_test.go index ce8e95ff0..129bf32bb 100644 --- a/receiver/azureblobrehydrationreceiver/factory_test.go +++ b/receiver/azureblobrehydrationreceiver/factory_test.go @@ -16,7 +16,6 @@ package azureblobrehydrationreceiver //import "github.com/observiq/bindplane-ote import ( "testing" - "time" "github.com/stretchr/testify/require" ) @@ -24,8 +23,8 @@ import ( func Test_createDefaultConfig(t *testing.T) { expectedCfg := &Config{ DeleteOnRead: false, - PollInterval: time.Minute, - PollTimeout: time.Second * 30, + BatchSize: defaultBatchSize, + PageSize: defaultPageSize, } componentCfg := createDefaultConfig() diff --git a/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go b/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go index dd5373c75..825e84a8a 100644 --- a/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go +++ b/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go @@ -19,6 +19,7 @@ import ( "context" "fmt" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" ) @@ -32,60 +33,75 @@ type BlobInfo struct { // //go:generate mockery --name BlobClient --output ./mocks --with-expecter --filename mock_blob_client.go --structname MockBlobClient type BlobClient interface { - // ListBlobs returns a list of blobInfo objects present in the container with the given prefix - ListBlobs(ctx context.Context, container string, prefix, marker *string) ([]*BlobInfo, *string, error) - // DownloadBlob downloads the contents of the blob into the supplied buffer. // It will return the count of bytes used in the buffer. DownloadBlob(ctx context.Context, container, blobPath string, buf []byte) (int64, error) // DeleteBlob deletes the blob in the specified container DeleteBlob(ctx context.Context, container, blobPath string) error + + // StreamBlobs will stream BlobInfo to the blobChan and errors to the errChan, generally if an errChan gets an item + // then the stream should be stopped + StreamBlobs(ctx context.Context, container string, prefix *string, errChan chan error, blobChan chan []*BlobInfo, doneChan chan struct{}) +} + +type blobClient interface { + NewListBlobsFlatPager(containerName string, options *azblob.ListBlobsFlatOptions) *runtime.Pager[azblob.ListBlobsFlatResponse] + DownloadBuffer(ctx context.Context, containerName string, blobPath string, buffer []byte, options *azblob.DownloadBufferOptions) (int64, error) + DeleteBlob(ctx context.Context, containerName string, blobPath string, options *azblob.DeleteBlobOptions) (azblob.DeleteBlobResponse, error) } +var _ blobClient = &azblob.Client{} + // AzureClient is an implementation of the BlobClient for Azure type AzureClient struct { - azClient *azblob.Client + azClient blobClient + batchSize int + pageSize int32 } // NewAzureBlobClient creates a new azureBlobClient with the given connection string -func NewAzureBlobClient(connectionString string) (BlobClient, error) { +func NewAzureBlobClient(connectionString string, batchSize, pageSize int) (BlobClient, error) { azClient, err := azblob.NewClientFromConnectionString(connectionString, nil) if err != nil { return nil, err } - return &AzureClient{ - azClient: azClient, + azClient: azClient, + batchSize: batchSize, + pageSize: int32(pageSize), }, nil } -// contentLengthKey key for the content length metadata -const contentLengthKey = "ContentLength" +// StreamBlobs will stream blobs to the blobChan and errors to the errChan, generally if an errChan gets an item +// then the stream should be stopped +func (a *AzureClient) StreamBlobs(ctx context.Context, container string, prefix *string, errChan chan error, blobChan chan []*BlobInfo, doneChan chan struct{}) { + var marker *string -// ListBlobs returns a list of blobInfo objects present in the container with the given prefix -func (a *AzureClient) ListBlobs(ctx context.Context, container string, prefix, marker *string) ([]*BlobInfo, *string, error) { - listOptions := &azblob.ListBlobsFlatOptions{ - Marker: marker, - Prefix: prefix, - } + pager := a.azClient.NewListBlobsFlatPager(container, &azblob.ListBlobsFlatOptions{ + Marker: marker, + Prefix: prefix, + MaxResults: &a.pageSize, + }) - pager := a.azClient.NewListBlobsFlatPager(container, listOptions) - - var nextMarker *string - blobs := make([]*BlobInfo, 0) for pager.More() { + select { + case <-ctx.Done(): + return + default: + } + resp, err := pager.NextPage(ctx) if err != nil { - return nil, nil, fmt.Errorf("listBlobs: %w", err) + errChan <- fmt.Errorf("error streaming blobs: %w", err) + return } + batch := []*BlobInfo{} for _, blob := range resp.Segment.BlobItems { - // Skip deleted blobs if blob.Deleted != nil && *blob.Deleted { continue } - // All blob fields are pointers so check all pointers we need before we try to process it if blob.Name == nil || blob.Properties == nil || blob.Properties.ContentLength == nil { continue } @@ -94,13 +110,17 @@ func (a *AzureClient) ListBlobs(ctx context.Context, container string, prefix, m Name: *blob.Name, Size: *blob.Properties.ContentLength, } - - blobs = append(blobs, info) + batch = append(batch, info) + if len(batch) == int(a.batchSize) { + blobChan <- batch + batch = []*BlobInfo{} + } } - nextMarker = resp.NextMarker + + blobChan <- batch } - return blobs, nextMarker, nil + close(doneChan) } // DownloadBlob downloads the contents of the blob into the supplied buffer. diff --git a/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client_test.go b/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client_test.go new file mode 100644 index 000000000..8fa446bdc --- /dev/null +++ b/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client_test.go @@ -0,0 +1,151 @@ +// Copyright observIQ, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package azureblob + +import ( + "context" + "errors" + "testing" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestNewAzureBlobClient(t *testing.T) { + tests := []struct { + name string + connectionStr string + batchSize int + pageSize int + expectedError bool + }{ + { + name: "Invalid connection string", + connectionStr: "invalid", + batchSize: 100, + pageSize: 1000, + expectedError: true, + }, + { + name: "Valid connection string", + connectionStr: "DefaultEndpointsProtocol=https;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;", + batchSize: 100, + pageSize: 1000, + expectedError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + client, err := NewAzureBlobClient(tt.connectionStr, tt.batchSize, tt.pageSize) + if tt.expectedError { + require.Error(t, err) + require.Nil(t, client) + } else { + require.NoError(t, err) + require.NotNil(t, client) + } + }) + } +} + +func TestDownloadBlob(t *testing.T) { + // Create a mock Azure client using testify/mock + mockClient := &mockAzureClient{} + + client := &AzureClient{ + azClient: mockClient, + batchSize: 100, + pageSize: 1000, + } + + ctx := context.Background() + container := "testcontainer" + blobPath := "test/blob.txt" + testData := []byte("test data content") + buf := make([]byte, 1024) + + mockClient.On("DownloadBuffer", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + buf := args.Get(3).([]byte) + copy(buf, testData) + }).Return(int64(len(testData)), nil) + + t.Run("successful download", func(t *testing.T) { + bytesDownloaded, err := client.DownloadBlob(ctx, container, blobPath, buf) + require.NoError(t, err) + require.Equal(t, int64(len(testData)), bytesDownloaded) + require.Equal(t, string(testData), string(buf[:len(testData)])) + }) +} + +func TestDeleteBlobSuccess(t *testing.T) { + // Create a mock Azure client using testify/mock + mockClient := &mockAzureClient{} + client := &AzureClient{ + azClient: mockClient, + batchSize: 100, + pageSize: 1000, + } + + ctx := context.Background() + container := "testcontainer" + blobPath := "test/blob.txt" + + mockClient.On("DeleteBlob", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(azblob.DeleteBlobResponse{}, nil) + err := client.DeleteBlob(ctx, container, blobPath) + require.NoError(t, err) + +} + +func TestDeleteBlobFailure(t *testing.T) { + // Create a mock Azure client using testify/mock + mockClient := &mockAzureClient{} + client := &AzureClient{ + azClient: mockClient, + batchSize: 100, + pageSize: 1000, + } + + ctx := context.Background() + container := "testcontainer" + blobPath := "test/blob.txt" + + mockClient.On("DeleteBlob", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(azblob.DeleteBlobResponse{}, errors.New("failed to delete")) + err := client.DeleteBlob(ctx, container, blobPath) + require.Error(t, err) + require.Equal(t, "failed to delete", err.Error()) +} + +// mockAzureClient is a mock implementation of the Azure blob client +type mockAzureClient struct { + mock.Mock +} + +func (m *mockAzureClient) NewListBlobsFlatPager(containerName string, options *azblob.ListBlobsFlatOptions) *runtime.Pager[azblob.ListBlobsFlatResponse] { + args := m.Called(containerName, options) + return args.Get(0).(*runtime.Pager[azblob.ListBlobsFlatResponse]) +} + +func (m *mockAzureClient) DownloadBuffer(ctx context.Context, containerName string, blobPath string, buffer []byte, options *azblob.DownloadBufferOptions) (int64, error) { + args := m.Called(ctx, containerName, blobPath, buffer, options) + return args.Get(0).(int64), args.Error(1) +} + +func (m *mockAzureClient) DeleteBlob(ctx context.Context, containerName string, blobPath string, options *azblob.DeleteBlobOptions) (azblob.DeleteBlobResponse, error) { + args := m.Called(ctx, containerName, blobPath, options) + return args.Get(0).(azblob.DeleteBlobResponse), args.Error(1) +} diff --git a/receiver/azureblobrehydrationreceiver/internal/azureblob/mocks/mock_blob_client.go b/receiver/azureblobrehydrationreceiver/internal/azureblob/mocks/mock_blob_client.go index 8e24aa1c5..32b5cdc7d 100644 --- a/receiver/azureblobrehydrationreceiver/internal/azureblob/mocks/mock_blob_client.go +++ b/receiver/azureblobrehydrationreceiver/internal/azureblob/mocks/mock_blob_client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.50.0. DO NOT EDIT. +// Code generated by mockery v2.51.0. DO NOT EDIT. package mocks @@ -130,73 +130,41 @@ func (_c *MockBlobClient_DownloadBlob_Call) RunAndReturn(run func(context.Contex return _c } -// ListBlobs provides a mock function with given fields: ctx, container, prefix, marker -func (_m *MockBlobClient) ListBlobs(ctx context.Context, container string, prefix *string, marker *string) ([]*azureblob.BlobInfo, *string, error) { - ret := _m.Called(ctx, container, prefix, marker) - - if len(ret) == 0 { - panic("no return value specified for ListBlobs") - } - - var r0 []*azureblob.BlobInfo - var r1 *string - var r2 error - if rf, ok := ret.Get(0).(func(context.Context, string, *string, *string) ([]*azureblob.BlobInfo, *string, error)); ok { - return rf(ctx, container, prefix, marker) - } - if rf, ok := ret.Get(0).(func(context.Context, string, *string, *string) []*azureblob.BlobInfo); ok { - r0 = rf(ctx, container, prefix, marker) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]*azureblob.BlobInfo) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, string, *string, *string) *string); ok { - r1 = rf(ctx, container, prefix, marker) - } else { - if ret.Get(1) != nil { - r1 = ret.Get(1).(*string) - } - } - - if rf, ok := ret.Get(2).(func(context.Context, string, *string, *string) error); ok { - r2 = rf(ctx, container, prefix, marker) - } else { - r2 = ret.Error(2) - } - - return r0, r1, r2 +// StreamBlobs provides a mock function with given fields: ctx, container, prefix, errChan, blobChan, doneChan +func (_m *MockBlobClient) StreamBlobs(ctx context.Context, container string, prefix *string, errChan chan error, blobChan chan []*azureblob.BlobInfo, doneChan chan struct{}) { + _m.Called(ctx, container, prefix, errChan, blobChan, doneChan) } -// MockBlobClient_ListBlobs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListBlobs' -type MockBlobClient_ListBlobs_Call struct { +// MockBlobClient_StreamBlobs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StreamBlobs' +type MockBlobClient_StreamBlobs_Call struct { *mock.Call } -// ListBlobs is a helper method to define mock.On call +// StreamBlobs is a helper method to define mock.On call // - ctx context.Context // - container string // - prefix *string -// - marker *string -func (_e *MockBlobClient_Expecter) ListBlobs(ctx interface{}, container interface{}, prefix interface{}, marker interface{}) *MockBlobClient_ListBlobs_Call { - return &MockBlobClient_ListBlobs_Call{Call: _e.mock.On("ListBlobs", ctx, container, prefix, marker)} +// - errChan chan error +// - blobChan chan []*azureblob.BlobInfo +// - doneChan chan struct{} +func (_e *MockBlobClient_Expecter) StreamBlobs(ctx interface{}, container interface{}, prefix interface{}, errChan interface{}, blobChan interface{}, doneChan interface{}) *MockBlobClient_StreamBlobs_Call { + return &MockBlobClient_StreamBlobs_Call{Call: _e.mock.On("StreamBlobs", ctx, container, prefix, errChan, blobChan, doneChan)} } -func (_c *MockBlobClient_ListBlobs_Call) Run(run func(ctx context.Context, container string, prefix *string, marker *string)) *MockBlobClient_ListBlobs_Call { +func (_c *MockBlobClient_StreamBlobs_Call) Run(run func(ctx context.Context, container string, prefix *string, errChan chan error, blobChan chan []*azureblob.BlobInfo, doneChan chan struct{})) *MockBlobClient_StreamBlobs_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(*string), args[3].(*string)) + run(args[0].(context.Context), args[1].(string), args[2].(*string), args[3].(chan error), args[4].(chan []*azureblob.BlobInfo), args[5].(chan struct{})) }) return _c } -func (_c *MockBlobClient_ListBlobs_Call) Return(_a0 []*azureblob.BlobInfo, _a1 *string, _a2 error) *MockBlobClient_ListBlobs_Call { - _c.Call.Return(_a0, _a1, _a2) +func (_c *MockBlobClient_StreamBlobs_Call) Return() *MockBlobClient_StreamBlobs_Call { + _c.Call.Return() return _c } -func (_c *MockBlobClient_ListBlobs_Call) RunAndReturn(run func(context.Context, string, *string, *string) ([]*azureblob.BlobInfo, *string, error)) *MockBlobClient_ListBlobs_Call { - _c.Call.Return(run) +func (_c *MockBlobClient_StreamBlobs_Call) RunAndReturn(run func(context.Context, string, *string, chan error, chan []*azureblob.BlobInfo, chan struct{})) *MockBlobClient_StreamBlobs_Call { + _c.Run(run) return _c } diff --git a/receiver/azureblobrehydrationreceiver/receiver.go b/receiver/azureblobrehydrationreceiver/receiver.go index f55eefb09..b1b323b46 100644 --- a/receiver/azureblobrehydrationreceiver/receiver.go +++ b/receiver/azureblobrehydrationreceiver/receiver.go @@ -19,6 +19,8 @@ import ( "errors" "fmt" "path/filepath" + "sync" + "sync/atomic" "time" "github.com/observiq/bindplane-otel-collector/internal/rehydration" @@ -40,15 +42,24 @@ type rehydrationReceiver struct { azureClient azureblob.BlobClient supportedTelemetry pipeline.Signal consumer rehydration.Consumer + checkpoint *rehydration.CheckPoint checkpointStore rehydration.CheckpointStorer + blobChan chan []*azureblob.BlobInfo + errChan chan error + doneChan chan struct{} + + // mutexes for ensuring a thread safe checkpoint + mut *sync.Mutex + wg *sync.WaitGroup + + lastBlob *azureblob.BlobInfo + lastBlobTime *time.Time + startingTime time.Time endingTime time.Time - doneChan chan struct{} - started bool - ctx context.Context - cancelFunc context.CancelCauseFunc + cancelFunc context.CancelFunc } // newMetricsReceiver creates a new metrics specific receiver. @@ -90,9 +101,13 @@ func newTracesReceiver(id component.ID, logger *zap.Logger, cfg *Config, nextCon return r, nil } +// factor of buffered channel size +// number of blobs to process at a time is blobChanSize * batchSize +const blobChanSize = 5 + // newRehydrationReceiver creates a new rehydration receiver func newRehydrationReceiver(id component.ID, logger *zap.Logger, cfg *Config) (*rehydrationReceiver, error) { - azureClient, err := newAzureBlobClient(cfg.ConnectionString) + azureClient, err := newAzureBlobClient(cfg.ConnectionString, cfg.BatchSize, cfg.PageSize) if err != nil { return nil, fmt.Errorf("new Azure client: %w", err) } @@ -109,170 +124,203 @@ func newRehydrationReceiver(id component.ID, logger *zap.Logger, cfg *Config) (* return nil, fmt.Errorf("invalid ending_time timestamp: %w", err) } - ctx, cancel := context.WithCancelCause(context.Background()) - return &rehydrationReceiver{ logger: logger, id: id, cfg: cfg, azureClient: azureClient, - doneChan: make(chan struct{}), checkpointStore: rehydration.NewNopStorage(), startingTime: startingTime, endingTime: endingTime, - ctx: ctx, - cancelFunc: cancel, + blobChan: make(chan []*azureblob.BlobInfo, blobChanSize), + errChan: make(chan error), + doneChan: make(chan struct{}), + mut: &sync.Mutex{}, + wg: &sync.WaitGroup{}, }, nil } // Start starts the rehydration receiver func (r *rehydrationReceiver) Start(ctx context.Context, host component.Host) error { + r.logAnyDeprecationWarnings() if r.cfg.StorageID != nil { checkpointStore, err := rehydration.NewCheckpointStorage(ctx, host, *r.cfg.StorageID, r.id, r.supportedTelemetry) if err != nil { return fmt.Errorf("NewCheckpointStorage: %w", err) } - r.checkpointStore = checkpointStore } - r.started = true - go r.scrape() + cancelCtx, cancel := context.WithCancel(ctx) + r.cancelFunc = cancel + + go r.streamRehydrateBlobs(cancelCtx) return nil } +func (r *rehydrationReceiver) logAnyDeprecationWarnings() { + if r.cfg.PollInterval != 0 { + r.logger.Warn("poll_interval is no longer recognized and will be removed in a future release. batch_size/page_size should be used instead") + } + + if r.cfg.PollTimeout != 0 { + r.logger.Warn("poll_timeout is no longer recognized and will be removed in a future release. batch_size/page_size should be used instead") + } +} + // Shutdown shuts down the rehydration receiver func (r *rehydrationReceiver) Shutdown(ctx context.Context) error { - r.cancelFunc(errors.New("shutdown")) - var err error - - // If we have called started then close and wait for goroutine to finish - if r.started { - select { - case <-ctx.Done(): - err = ctx.Err() - case <-r.doneChan: - } + if r.cancelFunc != nil { + r.cancelFunc() } - err = errors.Join(err, r.checkpointStore.Close(ctx)) + shutdownCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() - return err -} + // signal shutdown intent + close(r.doneChan) -// emptyPollLimit is the number of consecutive empty polling cycles that can -// occur before we stop polling. -const emptyPollLimit = 3 + // wait for any in-progress operations to finish + done := make(chan struct{}) + go func() { + r.wg.Wait() + close(done) + }() -// scrape scrapes the Azure api on interval -func (r *rehydrationReceiver) scrape() { - defer close(r.doneChan) - ticker := time.NewTicker(r.cfg.PollInterval) - defer ticker.Stop() + select { + case <-done: + case <-shutdownCtx.Done(): + return fmt.Errorf("shutdown timeout: %w", shutdownCtx.Err()) + } - var marker *string + var errs error + if err := r.makeCheckpoint(shutdownCtx); err != nil { + errs = errors.Join(errs, fmt.Errorf("error while saving checkpoint: %w", err)) + } + + if err := r.checkpointStore.Close(shutdownCtx); err != nil { + errs = errors.Join(errs, fmt.Errorf("error while closing checkpoint store: %w", err)) + } + + return errs +} - // load the previous checkpoint. If not exist should return zero value for time - checkpoint, err := r.checkpointStore.LoadCheckPoint(r.ctx, r.checkpointKey()) +func (r *rehydrationReceiver) streamRehydrateBlobs(ctx context.Context) { + checkpoint, err := r.checkpointStore.LoadCheckPoint(ctx, r.checkpointKey()) if err != nil { r.logger.Warn("Error loading checkpoint, continuing without a previous checkpoint", zap.Error(err)) checkpoint = rehydration.NewCheckpoint() } + r.checkpoint = checkpoint - // Call once before the loop to ensure we do a collection before the first ticker - numBlobsRehydrated := r.rehydrateBlobs(checkpoint, marker) - emptyBlobCounter := checkBlobCount(numBlobsRehydrated, 0) + var prefix *string + if r.cfg.RootFolder != "" { + prefix = &r.cfg.RootFolder + } + + startTime := time.Now() + r.logger.Info("Starting rehydration", zap.Time("startTime", startTime)) + + go r.azureClient.StreamBlobs(ctx, r.cfg.Container, prefix, r.errChan, r.blobChan, r.doneChan) for { select { - case <-r.ctx.Done(): + case <-ctx.Done(): + r.logger.Info("Context cancelled, stopping rehydration", zap.Int("durationSeconds", int(time.Since(startTime).Seconds()))) + return + case <-r.doneChan: + r.logger.Info("Finished rehydrating blobs", zap.Int("durationSeconds", int(time.Since(startTime).Seconds()))) + return + case err := <-r.errChan: + r.logger.Error("Error streaming blobs, stopping rehydration", zap.Error(err), zap.Int("durationSeconds", int(time.Since(startTime).Seconds()))) return - case <-ticker.C: - // Polling for blobs has egress charges so we want to stop polling - // after we stop finding blobs. - if emptyBlobCounter == emptyPollLimit { + case br, ok := <-r.blobChan: + if !ok { + r.logger.Info("Finished rehydrating blobs", zap.Int("durationSeconds", int(time.Since(startTime).Seconds()))) return } - - numBlobsRehydrated := r.rehydrateBlobs(checkpoint, marker) - emptyBlobCounter = checkBlobCount(numBlobsRehydrated, emptyBlobCounter) + numProcessedBlobs := r.rehydrateBlobs(ctx, br) + r.logger.Debug("Processed a number of blobs", zap.Int("num_processed_blobs", numProcessedBlobs)) } } } -// rehydrateBlobs pulls blob paths from the UI and if they are within the specified -// time range then the blobs will be downloaded and rehydrated. -// The passed in checkpoint and marker will be updated and should be used in the next iteration. -// The count of blobs processed will be returned -func (r *rehydrationReceiver) rehydrateBlobs(checkpoint *rehydration.CheckPoint, marker *string) (numBlobsRehydrated int) { - var prefix *string - if r.cfg.RootFolder != "" { - prefix = &r.cfg.RootFolder - } - - ctxTimeout, cancel := context.WithTimeout(r.ctx, r.cfg.PollTimeout) - defer cancel() - - // get blobs from Azure - blobs, nextMarker, err := r.azureClient.ListBlobs(ctxTimeout, r.cfg.Container, prefix, marker) - if err != nil { - r.logger.Error("Failed to list blobs", zap.Error(err)) - return - } - - marker = nextMarker - +func (r *rehydrationReceiver) rehydrateBlobs(ctx context.Context, blobs []*azureblob.BlobInfo) (numProcessedBlobs int) { // Go through each blob and parse it's path to determine if we should consume it or not + r.logger.Debug("Received a batch of blobs, parsing through them to determine if they should be rehydrated", zap.Int("num_blobs", len(blobs))) + processedBlobCount := atomic.Int64{} for _, blob := range blobs { + select { + case <-ctx.Done(): + break + default: + } + blobTime, telemetryType, err := rehydration.ParseEntityPath(blob.Name) switch { case errors.Is(err, rehydration.ErrInvalidEntityPath): r.logger.Debug("Skipping Blob, non-matching blob path", zap.String("blob", blob.Name)) case err != nil: r.logger.Error("Error processing blob path", zap.String("blob", blob.Name), zap.Error(err)) - case checkpoint.ShouldParse(*blobTime, blob.Name): + case r.checkpoint.ShouldParse(*blobTime, blob.Name): // if the blob is not in the specified time range or not of the telemetry type supported by this receiver // then skip consuming it. if !rehydration.IsInTimeRange(*blobTime, r.startingTime, r.endingTime) || telemetryType != r.supportedTelemetry { continue } - // Process and consume the blob at the given path - if err := r.processBlob(blob); err != nil { - r.logger.Error("Error consuming blob", zap.String("blob", blob.Name), zap.Error(err)) - continue - } - - numBlobsRehydrated++ - - // Update and save the checkpoint with the most recently processed blob - checkpoint.UpdateCheckpoint(*blobTime, blob.Name) - if err := r.checkpointStore.SaveCheckpoint(r.ctx, r.checkpointKey(), checkpoint); err != nil { - r.logger.Error("Error while saving checkpoint", zap.Error(err)) - } + r.wg.Add(1) + go func() { + defer r.wg.Done() + select { + case <-ctx.Done(): + return + default: + } + // Process and consume the blob at the given path + if err := r.processBlob(ctx, blob); err != nil { + // If the error is because the context was canceled, then we don't want to log it + if !errors.Is(err, context.Canceled) { + r.logger.Error("Error consuming blob", zap.String("blob", blob.Name), zap.Error(err)) + } + return + } + processedBlobCount.Add(1) - // Delete blob if configured to do so - if r.cfg.DeleteOnRead { - if err := r.azureClient.DeleteBlob(r.ctx, r.cfg.Container, blob.Name); err != nil { + // Delete blob if configured to do so + if err := r.conditionallyDeleteBlob(ctx, blob); err != nil { r.logger.Error("Error while attempting to delete blob", zap.String("blob", blob.Name), zap.Error(err)) } - } + + if r.lastBlobTime == nil || r.lastBlobTime.Before(*blobTime) { + r.mut.Lock() + r.lastBlob = blob + r.lastBlobTime = blobTime + r.mut.Unlock() + } + }() } } - return + r.wg.Wait() + + if err := r.makeCheckpoint(ctx); err != nil { + r.logger.Error("Error while saving checkpoint", zap.Error(err)) + } + + return int(processedBlobCount.Load()) } // processBlob does the following: // 1. Downloads the blob // 2. Decompresses the blob if applicable // 3. Pass the blob to the consumer -func (r *rehydrationReceiver) processBlob(blob *azureblob.BlobInfo) error { +func (r *rehydrationReceiver) processBlob(ctx context.Context, blob *azureblob.BlobInfo) error { // Allocate a buffer the size of the blob. If the buffer isn't big enough download errors. blobBuffer := make([]byte, blob.Size) - size, err := r.azureClient.DownloadBlob(r.ctx, r.cfg.Container, blob.Name, blobBuffer) + size, err := r.azureClient.DownloadBlob(ctx, r.cfg.Container, blob.Name, blobBuffer) if err != nil { return fmt.Errorf("download blob: %w", err) } @@ -291,7 +339,7 @@ func (r *rehydrationReceiver) processBlob(blob *azureblob.BlobInfo) error { return fmt.Errorf("unsupported file type: %s", ext) } - if err := r.consumer.Consume(r.ctx, blobBuffer); err != nil { + if err := r.consumer.Consume(ctx, blobBuffer); err != nil { return fmt.Errorf("consume: %w", err) } return nil @@ -305,16 +353,20 @@ func (r *rehydrationReceiver) checkpointKey() string { return fmt.Sprintf("%s_%s_%s", checkpointStorageKey, r.id, r.supportedTelemetry.String()) } -// checkBlobCount checks the number of blobs rehydrated and the current state of the -// empty counter. If zero blobs were rehydrated increment the counter. -// If there were blobs rehydrated reset the counter as we want to track consecutive zero sized polls. -func checkBlobCount(numBlobsRehydrated, emptyBlobsCounter int) int { - switch { - case emptyBlobsCounter == emptyPollLimit: // If we are at the limit return the limit - return emptyPollLimit - case numBlobsRehydrated == 0: // If no blobs were rehydrated then increment the empty blobs counter - return emptyBlobsCounter + 1 - default: // Default case is numBlobsRehydrated > 0 so reset emptyBlobsCounter to 0 - return 0 +func (r *rehydrationReceiver) makeCheckpoint(ctx context.Context) error { + if r.lastBlob == nil || r.lastBlobTime == nil { + return nil + } + r.logger.Debug("Making checkpoint", zap.String("blob", r.lastBlob.Name), zap.Time("time", *r.lastBlobTime)) + r.mut.Lock() + defer r.mut.Unlock() + r.checkpoint.UpdateCheckpoint(*r.lastBlobTime, r.lastBlob.Name) + return r.checkpointStore.SaveCheckpoint(ctx, r.checkpointKey(), r.checkpoint) +} + +func (r *rehydrationReceiver) conditionallyDeleteBlob(ctx context.Context, blob *azureblob.BlobInfo) error { + if !r.cfg.DeleteOnRead { + return nil } + return r.azureClient.DeleteBlob(ctx, r.cfg.Container, blob.Name) } diff --git a/receiver/azureblobrehydrationreceiver/receiver_test.go b/receiver/azureblobrehydrationreceiver/receiver_test.go index 3144d6347..3a7af9e6b 100644 --- a/receiver/azureblobrehydrationreceiver/receiver_test.go +++ b/receiver/azureblobrehydrationreceiver/receiver_test.go @@ -19,14 +19,11 @@ import ( "compress/gzip" "context" "errors" - "sync/atomic" + "strings" + "sync" "testing" "time" - "github.com/observiq/bindplane-otel-collector/internal/rehydration" - "github.com/observiq/bindplane-otel-collector/internal/testutils" - "github.com/observiq/bindplane-otel-collector/receiver/azureblobrehydrationreceiver/internal/azureblob" - blobmocks "github.com/observiq/bindplane-otel-collector/receiver/azureblobrehydrationreceiver/internal/azureblob/mocks" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" @@ -34,6 +31,12 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pipeline" "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" + + "github.com/observiq/bindplane-otel-collector/internal/rehydration" + "github.com/observiq/bindplane-otel-collector/internal/testutils" + "github.com/observiq/bindplane-otel-collector/receiver/azureblobrehydrationreceiver/internal/azureblob" + blobmocks "github.com/observiq/bindplane-otel-collector/receiver/azureblobrehydrationreceiver/internal/azureblob/mocks" ) func Test_newMetricsReceiver(t *testing.T) { @@ -111,32 +114,10 @@ func Test_fullRehydration(t *testing.T) { cfg := &Config{ StartingTime: "2023-10-02T17:00", EndingTime: "2023-10-02T18:00", - PollInterval: 10 * time.Millisecond, Container: "container", DeleteOnRead: false, } - t.Run("empty blob polling", func(t *testing.T) { - var listCounter atomic.Int32 - - // Setup mocks - mockClient := setNewAzureBlobClient(t) - mockClient.EXPECT().ListBlobs(mock.Anything, cfg.Container, (*string)(nil), (*string)(nil)).Times(3).Return([]*azureblob.BlobInfo{}, nil, nil). - Run(func(_ mock.Arguments) { - listCounter.Add(1) - }) - - // Create new receiver - testConsumer := &consumertest.MetricsSink{} - r, err := newMetricsReceiver(id, testLogger, cfg, testConsumer) - require.NoError(t, err) - - checkFunc := func() bool { - return listCounter.Load() == 3 - } - runRehydrationValidateTest(t, r, checkFunc) - }) - t.Run("metrics", func(t *testing.T) { // Test data metrics, jsonBytes := testutils.GenerateTestMetrics(t) @@ -153,24 +134,26 @@ func Test_fullRehydration(t *testing.T) { }, } + // Create new receiver + targetBlob := returnedBlobInfo[0] // Setup mocks mockClient := setNewAzureBlobClient(t) - mockClient.EXPECT().ListBlobs(mock.Anything, cfg.Container, (*string)(nil), (*string)(nil)).Return(returnedBlobInfo, nil, nil) + testConsumer := &consumertest.MetricsSink{} + r, err := newMetricsReceiver(id, testLogger, cfg, testConsumer) + require.NoError(t, err) + + mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { + r.blobChan <- returnedBlobInfo + }) + mockClient.EXPECT().DownloadBlob(mock.Anything, cfg.Container, targetBlob.Name, mock.Anything).RunAndReturn(func(_ context.Context, _ string, _ string, buf []byte) (int64, error) { require.Len(t, buf, int(expectedBuffSize)) - copy(buf, jsonBytes) - return expectedBuffSize, nil }) - // Create new receiver - testConsumer := &consumertest.MetricsSink{} - r, err := newMetricsReceiver(id, testLogger, cfg, testConsumer) - require.NoError(t, err) - checkFunc := func() bool { return testConsumer.DataPointCount() == metrics.DataPointCount() } @@ -195,23 +178,23 @@ func Test_fullRehydration(t *testing.T) { } targetBlob := returnedBlobInfo[0] + mockClient := setNewAzureBlobClient(t) + + // Create new receiver + testConsumer := &consumertest.TracesSink{} + r, err := newTracesReceiver(id, testLogger, cfg, testConsumer) + require.NoError(t, err) // Setup mocks - mockClient := setNewAzureBlobClient(t) - mockClient.EXPECT().ListBlobs(mock.Anything, cfg.Container, (*string)(nil), (*string)(nil)).Return(returnedBlobInfo, nil, nil) + mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { + r.blobChan <- returnedBlobInfo + }) mockClient.EXPECT().DownloadBlob(mock.Anything, cfg.Container, targetBlob.Name, mock.Anything).RunAndReturn(func(_ context.Context, _ string, _ string, buf []byte) (int64, error) { require.Len(t, buf, int(expectedBuffSize)) - copy(buf, jsonBytes) - return expectedBuffSize, nil }) - // Create new receiver - testConsumer := &consumertest.TracesSink{} - r, err := newTracesReceiver(id, testLogger, cfg, testConsumer) - require.NoError(t, err) - checkFunc := func() bool { return testConsumer.SpanCount() == traces.SpanCount() } @@ -239,7 +222,14 @@ func Test_fullRehydration(t *testing.T) { // Setup mocks mockClient := setNewAzureBlobClient(t) - mockClient.EXPECT().ListBlobs(mock.Anything, cfg.Container, (*string)(nil), (*string)(nil)).Return(returnedBlobInfo, nil, nil) + // Create new receiver + testConsumer := &consumertest.LogsSink{} + r, err := newLogsReceiver(id, testLogger, cfg, testConsumer) + require.NoError(t, err) + + mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { + r.blobChan <- returnedBlobInfo + }) mockClient.EXPECT().DownloadBlob(mock.Anything, cfg.Container, targetBlob.Name, mock.Anything).RunAndReturn(func(_ context.Context, _ string, _ string, buf []byte) (int64, error) { require.Len(t, buf, int(expectedBuffSize)) @@ -248,11 +238,6 @@ func Test_fullRehydration(t *testing.T) { return expectedBuffSize, nil }) - // Create new receiver - testConsumer := &consumertest.LogsSink{} - r, err := newLogsReceiver(id, testLogger, cfg, testConsumer) - require.NoError(t, err) - checkFunc := func() bool { return testConsumer.LogRecordCount() == logs.LogRecordCount() } @@ -281,7 +266,14 @@ func Test_fullRehydration(t *testing.T) { // Setup mocks mockClient := setNewAzureBlobClient(t) - mockClient.EXPECT().ListBlobs(mock.Anything, cfg.Container, (*string)(nil), (*string)(nil)).Return(returnedBlobInfo, nil, nil) + // Create new receiver + testConsumer := &consumertest.LogsSink{} + r, err := newLogsReceiver(id, testLogger, cfg, testConsumer) + require.NoError(t, err) + + mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { + r.blobChan <- returnedBlobInfo + }) mockClient.EXPECT().DownloadBlob(mock.Anything, cfg.Container, targetBlob.Name, mock.Anything).RunAndReturn(func(_ context.Context, _ string, _ string, buf []byte) (int64, error) { require.Len(t, buf, int(expectedBuffSize)) @@ -290,11 +282,6 @@ func Test_fullRehydration(t *testing.T) { return expectedBuffSize, nil }) - // Create new receiver - testConsumer := &consumertest.LogsSink{} - r, err := newLogsReceiver(id, testLogger, cfg, testConsumer) - require.NoError(t, err) - checkFunc := func() bool { return testConsumer.LogRecordCount() == logs.LogRecordCount() } @@ -303,10 +290,12 @@ func Test_fullRehydration(t *testing.T) { }) t.Run("Delete on Read", func(t *testing.T) { - cfg.DeleteOnRead = true - t.Cleanup(func() { - cfg.DeleteOnRead = false - }) + deleteCfg := &Config{ + StartingTime: cfg.StartingTime, + EndingTime: cfg.EndingTime, + Container: cfg.Container, + DeleteOnRead: true, + } // Test data logs, jsonBytes := testutils.GenerateTestLogs(t) @@ -327,7 +316,14 @@ func Test_fullRehydration(t *testing.T) { // Setup mocks mockClient := setNewAzureBlobClient(t) - mockClient.EXPECT().ListBlobs(mock.Anything, cfg.Container, (*string)(nil), (*string)(nil)).Return(returnedBlobInfo, nil, nil) + // Create new receiver + testConsumer := &consumertest.LogsSink{} + r, err := newLogsReceiver(id, testLogger, deleteCfg, testConsumer) + require.NoError(t, err) + + mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { + r.blobChan <- returnedBlobInfo + }) mockClient.EXPECT().DownloadBlob(mock.Anything, cfg.Container, targetBlob.Name, mock.Anything).RunAndReturn(func(_ context.Context, _ string, _ string, buf []byte) (int64, error) { require.Len(t, buf, int(expectedBuffSize)) @@ -335,12 +331,8 @@ func Test_fullRehydration(t *testing.T) { return expectedBuffSize, nil }) - mockClient.EXPECT().DeleteBlob(mock.Anything, cfg.Container, targetBlob.Name).Return(nil) - // Create new receiver - testConsumer := &consumertest.LogsSink{} - r, err := newLogsReceiver(id, testLogger, cfg, testConsumer) - require.NoError(t, err) + mockClient.EXPECT().DeleteBlob(mock.Anything, cfg.Container, targetBlob.Name).Return(nil) checkFunc := func() bool { return testConsumer.LogRecordCount() == logs.LogRecordCount() @@ -375,7 +367,15 @@ func Test_fullRehydration(t *testing.T) { // Setup mocks mockClient := setNewAzureBlobClient(t) - mockClient.EXPECT().ListBlobs(mock.Anything, cfg.Container, (*string)(nil), (*string)(nil)).Return(returnedBlobInfo, nil, nil) + + // Create new receiver + testConsumer := &consumertest.LogsSink{} + r, err := newLogsReceiver(id, testLogger, cfg, testConsumer) + require.NoError(t, err) + + mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { + r.blobChan <- returnedBlobInfo + }) mockClient.EXPECT().DownloadBlob(mock.Anything, cfg.Container, targetBlob.Name, mock.Anything).RunAndReturn(func(_ context.Context, _ string, _ string, buf []byte) (int64, error) { require.Len(t, buf, int(expectedBuffSize)) @@ -384,11 +384,6 @@ func Test_fullRehydration(t *testing.T) { return expectedBuffSize, nil }) - // Create new receiver - testConsumer := &consumertest.LogsSink{} - r, err := newLogsReceiver(id, testLogger, cfg, testConsumer) - require.NoError(t, err) - checkFunc := func() bool { return testConsumer.LogRecordCount() == logs.LogRecordCount() } @@ -496,10 +491,9 @@ func Test_processBlob(t *testing.T) { }, consumer: mockConsumer, azureClient: mockClient, - ctx: context.Background(), } - err := r.processBlob(tc.info) + err := r.processBlob(context.Background(), tc.info) if tc.expectedErr == nil { require.NoError(t, err) } else { @@ -509,6 +503,47 @@ func Test_processBlob(t *testing.T) { } } +func TestLogsDeprecationWarnings(t *testing.T) { + mockClient := setNewAzureBlobClient(t) + + testLogger, ol := observer.New(zap.WarnLevel) + + r := &rehydrationReceiver{ + logger: zap.New(testLogger), + cfg: &Config{ + StartingTime: "2023-10-02T17:00", + EndingTime: "2023-10-02T17:01", + PollInterval: 1 * time.Second, + PollTimeout: 1 * time.Second, + }, + azureClient: mockClient, + blobChan: make(chan []*azureblob.BlobInfo), + errChan: make(chan error), + doneChan: make(chan struct{}), + mut: &sync.Mutex{}, + checkpointStore: rehydration.NewNopStorage(), + } + mockClient.On("StreamBlobs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { + close(r.doneChan) + }) + + require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) + + require.Eventually(t, func() bool { + foundBothLogs := false + foundPollInterval := false + for _, log := range ol.All() { + if strings.Contains(log.Message, "poll_interval is no longer recognized and will be removed in a future release. batch_size/page_size should be used instead") { + foundBothLogs = true + } + if strings.Contains(log.Message, "poll_interval is no longer recognized and will be removed in a future release. batch_size/page_size should be used instead") { + foundPollInterval = true + } + } + return foundBothLogs && foundPollInterval + }, 10*time.Second, 1*time.Second) +} + // setNewAzureBlobClient helper function used to set the newAzureBlobClient // function with a mock and return the mock. func setNewAzureBlobClient(t *testing.T) *blobmocks.MockBlobClient { @@ -517,7 +552,7 @@ func setNewAzureBlobClient(t *testing.T) *blobmocks.MockBlobClient { mockClient := blobmocks.NewMockBlobClient(t) - newAzureBlobClient = func(_ string) (azureblob.BlobClient, error) { + newAzureBlobClient = func(_ string, _ int, _ int) (azureblob.BlobClient, error) { return mockClient, nil }