Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: refactor azureeventhubrehydrationreceiver to stream blobs as to not lock up on larger environments (BPOP-831) #2098

Merged
merged 21 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 35 additions & 15 deletions receiver/azureblobrehydrationreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,30 @@ This is not a traditional receiver that continually produces data but rather reh
- Traces

## How it works
1. The receiver polls blob storage for all blobs in the specified container.
1. The receiver polls blob storage for pages of blobs in the specified container. There is no current way of specifying a time range to rehydrate so any blobs outside fo the time range still need to be retrieved from the API in order to filter via the `starting_time` and `ending_time` configuration.
dpaasman00 marked this conversation as resolved.
Show resolved Hide resolved
2. The receiver will parse each blob's path to determine if it matches a path created by the [Azure Blob Exporter](../../exporter/azureblobexporter/README.md#blob-path).
3. If the blob path is from the exporter, the receiver will parse the timestamp represented by the path.
4. If the timestamp is within the configured range the receiver will download the blob and parse its contents into OTLP data.

a. The receiver will process both uncompressed JSON blobs and blobs compressed with gzip.

## Configuration
| Field | Type | Default | Required | Description |
|--------------------|-----------|------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| connection_string | string | | `true` | The connection string to the Azure Blob Storage account. Can be found under the `Access keys` section of your storage account. |
| container | string | | `true` | The name of the container to rehydrate from. |
| root_folder | string | | `false` | The root folder that prefixes the blob path. Should match the `root_folder` value of the Azure Blob Exporter. |
| starting_time | string | | `true ` | The UTC start time that represents the start of the time range to rehydrate from. Must be in the form `YYYY-MM-DDTHH:MM`. |
| ending_time | string | | `true ` | The UTC end time that represents the end of the time range to rehydrate from. Must be in the form `YYYY-MM-DDTHH:MM`. |
| delete_on_read | bool | `false` | `false ` | If `true` the blob will be deleted after being rehydrated. |
| poll_interval | string | `1m` | `false ` | How often to read a new set of blobs. This value is mostly to control how often the blob API is called to ensure once rehydration is done the receiver isn't making too many API calls. |
| poll_timeout | string | `30s` | `false ` | The timeout used when reading blobs from Azure. |
| storage | string | | `false ` | The component ID of a storage extension. The storage extension prevents duplication of data after a collector restart by remembering which blobs were previously rehydrated. |

| Field | Type | Default | Required | Description |
|-------|------|---------|----------|-------------|
| connection_string | string | | `true` | The connection string to the Azure Blob Storage account. Can be found under the `Access keys` section of your storage account. |
| container | string | | `true` | The name of the container to rehydrate from. |
| root_folder | string | | `false` | The root folder that prefixes the blob path. Should match the `root_folder` value of the Azure Blob Exporter. |
| starting_time | string | | `true` | The UTC start time that represents the start of the time range to rehydrate from. Must be in the form `YYYY-MM-DDTHH:MM`. |
| ending_time | string | | `true` | The UTC end time that represents the end of the time range to rehydrate from. Must be in the form `YYYY-MM-DDTHH:MM`. |
| delete_on_read | bool | `false` | `false` | If `true` the blob will be deleted after being rehydrated. |
| storage | string | | `false` | The component ID of a storage extension. The storage extension prevents duplication of data after a collector restart by remembering which blobs were previously rehydrated. |
| poll_interval* | string | | `false` | The interval at which the Azure API is scanned for blobs. |
| poll_timeout* | string | | `false` | The timeout for the Azure API to scan for blobs. |
| batch_size | int | `30` | `false` | The number of blobs to download and process in the pipeline simultaneously. This parameter directly impacts performance by controlling the concurrent blob download limit. |
| page_size | int | `1000` | `false` | The maximum number of blob information to request in a single API call. |

> Deprecated*: `poll_interval` and `poll_timeout` are no longer supported and `batch_size`/`page_size` should be used instead.

## Example Configuration

Expand All @@ -41,6 +46,7 @@ This configuration specifies a `connection_string`, `container`, `starting_time`
This will rehydrate all blobs in the container `my-container` that have a path that represents they were created between `1:00pm` and `2:30pm` UTC time on `October 1, 2023`.

Such a path could look like the following:

```
year=2023/month=10/day=01/hour=13/minute=30/metrics_12345.json
year=2023/month=10/day=01/hour=13/minute=30/logs_12345.json
Expand All @@ -53,25 +59,26 @@ azureblobrehydration:
container: "my-container"
starting_time: 2023-10-01T13:00
ending_time: 2023-10-01T14:30
batch_size: 100
page_size: 1000
```

### Using Storage Extension Configuration

This configuration shows using a storage extension to track rehydration progress over agent restarts. The `storage` field is set to the component ID of the storage extension.


```yaml
extensions:
file_storage:
directory: $OIQ_OTEL_COLLECTOR_HOME/storage

receivers:
azureblobrehydration:
connection_string: "DefaultEndpointsProtocol=https;AccountName=storage_account_name;AccountKey=storage_account_key;EndpointSuffix=core.windows.net"
container: "my-container"
starting_time: 2023-10-01T13:00
ending_time: 2023-10-01T14:30
storage: "file_storage"
batch_size: 100
page_size: 1000
```

### Root Folder Configuration
Expand All @@ -93,6 +100,8 @@ azureblobrehydration:
starting_time: 2023-10-01T13:00
ending_time: 2023-10-01T14:30
root_folder: "root"
batch_size: 100
page_size: 1000
```

### Delete on read Configuration
Expand All @@ -106,4 +115,15 @@ azureblobrehydration:
starting_time: 2023-10-01T13:00
ending_time: 2023-10-01T14:30
delete_on_read: true
batch_size: 100
page_size: 1000
```

## Deprecated Configuration

The following configuration fields are deprecated and will be removed in a future release.

| Field | Deprecated |
|-------|----------|
| poll_interval | true |
| poll_timeout | true |
22 changes: 16 additions & 6 deletions receiver/azureblobrehydrationreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (

// Config is the configuration for the azure blob rehydration receiver
type Config struct {
// BatchSize is the number of blobs to process entering the pipeline in a single batch. (default 30)
BatchSize int `mapstructure:"batch_size"`
jsirianni marked this conversation as resolved.
Show resolved Hide resolved

// ConnectionString is the Azure Blob Storage connection key,
// which can be found in the Azure Blob Storage resource on the Azure Portal. (no default)
ConnectionString string `mapstructure:"connection_string"`
Expand All @@ -45,19 +48,30 @@ type Config struct {
// Default value of false
DeleteOnRead bool `mapstructure:"delete_on_read"`

// Deprecated: PollInterval is no longer required due to streaming blobs for processing.
// if a value is provided, validation will throw an error
// PollInterval is the interval at which the Azure API is scanned for blobs.
// Default value of 1m
PollInterval time.Duration `mapstructure:"poll_interval"`

// Deprecated: PollTimeout is no longer required due to streaming blobs for processing.
// if a value is provided, validation will throw an error
// PollTimeout is the timeout for the Azure API to scan for blobs.
PollTimeout time.Duration `mapstructure:"poll_timeout"`

// PageSize is the number of blobs to request from the Azure API at a time. (default 1000)
PageSize int `mapstructure:"page_size"`

// ID of the storage extension to use for storing progress
StorageID *component.ID `mapstructure:"storage"`
}

// Validate validates the config
func (c *Config) Validate() error {
if c.BatchSize < 1 {
return errors.New("batch_size must be greater than 0")
}

if c.ConnectionString == "" {
return errors.New("connection_string is required")
}
Expand All @@ -81,12 +95,8 @@ func (c *Config) Validate() error {
return errors.New("ending_time must be at least one minute after starting_time")
}

if c.PollInterval < time.Second {
return errors.New("poll_interval must be at least one second")
}

if c.PollTimeout < time.Second {
return errors.New("poll_timeout must be at least one second")
if c.PageSize < 1 {
return errors.New("page_size must be greater than 0")
}

return nil
Expand Down
64 changes: 39 additions & 25 deletions receiver/azureblobrehydrationreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package azureblobrehydrationreceiver //import "github.com/observiq/bindplane-ote
import (
"errors"
"testing"
"time"

"github.com/stretchr/testify/require"
)
Expand All @@ -37,8 +36,8 @@ func TestConfigValidate(t *testing.T) {
StartingTime: "2023-10-02T17:00",
EndingTime: "2023-10-02T17:01",
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 30,
PageSize: 1000,
},
expectErr: errors.New("connection_string is required"),
},
Expand All @@ -51,8 +50,8 @@ func TestConfigValidate(t *testing.T) {
StartingTime: "2023-10-02T17:00",
EndingTime: "2023-10-02T17:01",
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 30,
PageSize: 1000,
},
expectErr: errors.New("container is required"),
},
Expand All @@ -65,8 +64,8 @@ func TestConfigValidate(t *testing.T) {
StartingTime: "",
EndingTime: "2023-10-02T17:01",
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 30,
PageSize: 1000,
},
expectErr: errors.New("starting_time is invalid: missing value"),
},
Expand All @@ -79,8 +78,8 @@ func TestConfigValidate(t *testing.T) {
StartingTime: "2023-10-02T17:00",
EndingTime: "",
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 30,
PageSize: 1000,
},
expectErr: errors.New("ending_time is invalid: missing value"),
},
Expand All @@ -93,8 +92,8 @@ func TestConfigValidate(t *testing.T) {
StartingTime: "invalid_time",
EndingTime: "2023-10-02T17:01",
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 30,
PageSize: 1000,
},
expectErr: errors.New("starting_time is invalid: invalid timestamp"),
},
Expand All @@ -107,8 +106,8 @@ func TestConfigValidate(t *testing.T) {
StartingTime: "2023-10-02T17:00",
EndingTime: "invalid_time",
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 30,
PageSize: 1000,
},
expectErr: errors.New("ending_time is invalid: invalid timestamp"),
},
Expand All @@ -121,38 +120,53 @@ func TestConfigValidate(t *testing.T) {
StartingTime: "2023-10-02T17:00",
EndingTime: "2023-10-02T16:00",
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 30,
PageSize: 1000,
},
expectErr: errors.New("ending_time must be at least one minute after starting_time"),
},
{
desc: "Bad poll_interval",
desc: "with deprecated poll_interval",
cfg: &Config{
ConnectionString: "connection_string",
Container: "container",
RootFolder: "root",
StartingTime: "2023-10-02T17:00",
EndingTime: "2023-10-02T17:01",
DeleteOnRead: false,
PollInterval: time.Millisecond,
PollTimeout: time.Second * 10,
BatchSize: 30,
PageSize: 1000,
},
expectErr: errors.New("poll_interval must be at least one second"),
// expect no error until future release where poll_interval is removed
expectErr: nil,
},
{
desc: "Bad batch_size",
cfg: &Config{
ConnectionString: "connection_string",
Container: "container",
RootFolder: "root",
StartingTime: "2023-10-02T17:00",
EndingTime: "2023-10-02T17:01",
DeleteOnRead: false,
BatchSize: 0,
PageSize: 1000,
},
expectErr: errors.New("batch_size must be greater than 0"),
},
{
desc: "Bad poll_timeout",
desc: "Bad page_size",
cfg: &Config{
ConnectionString: "connection_string",
Container: "container",
RootFolder: "root",
StartingTime: "2023-10-02T17:00",
EndingTime: "2023-10-02T17:01",
DeleteOnRead: false,
PollInterval: time.Second * 2,
PollTimeout: time.Millisecond,
BatchSize: 30,
PageSize: 0,
},
expectErr: errors.New("poll_timeout must be at least one second"),
expectErr: errors.New("page_size must be greater than 0"),
},
{
desc: "Valid config",
Expand All @@ -163,8 +177,8 @@ func TestConfigValidate(t *testing.T) {
StartingTime: "2023-10-02T17:00",
EndingTime: "2023-10-02T17:01",
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 30,
PageSize: 1000,
},
expectErr: nil,
},
Expand Down
8 changes: 5 additions & 3 deletions receiver/azureblobrehydrationreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package azureblobrehydrationreceiver //import "github.com/observiq/bindplane-ote
import (
"context"
"errors"
"time"

"github.com/observiq/bindplane-otel-collector/receiver/azureblobrehydrationreceiver/internal/metadata"
"go.opentelemetry.io/collector/component"
Expand All @@ -28,6 +27,9 @@ import (
// errImproperCfgType error for when an invalid config type is passed to receiver creation funcs
var errImproperCfgType = errors.New("improper config type")

const defaultBatchSize = 30
const defaultPageSize = 1000

// NewFactory creates a new receiver factory
func NewFactory() receiver.Factory {
return receiver.NewFactory(
Expand All @@ -43,8 +45,8 @@ func NewFactory() receiver.Factory {
func createDefaultConfig() component.Config {
return &Config{
DeleteOnRead: false,
PollInterval: time.Minute,
PollTimeout: time.Second * 30,
BatchSize: defaultBatchSize,
PageSize: defaultPageSize,
}
}

Expand Down
5 changes: 2 additions & 3 deletions receiver/azureblobrehydrationreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@ package azureblobrehydrationreceiver //import "github.com/observiq/bindplane-ote

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func Test_createDefaultConfig(t *testing.T) {
expectedCfg := &Config{
DeleteOnRead: false,
PollInterval: time.Minute,
PollTimeout: time.Second * 30,
BatchSize: defaultBatchSize,
PageSize: defaultPageSize,
}

componentCfg := createDefaultConfig()
Expand Down
Loading