Skip to content

Commit

Permalink
fix: refactor azureeventhubrehydrationreceiver to stream blobs as to …
Browse files Browse the repository at this point in the history
…not lock up on larger environments (BPOP-831) (#2098)

* pre tests working; refactor to stream blobs as to not lock up

* fix tests

* remove polling parameters and fix gosec error

* some more tests

* add license

* remove extra debug line

* address PR feedback

* only log messages rather than submit validation errors

* spin up goroutine per blob in the batch, change default batch size to 30 to keep a moderate default

* add buffered chan size of 5 to start

* remove 3 empty request limit

* remove testing log line

* harden shutdown logic to close channel with a timeout

* fix lint

* dakota PR feedback

* more feedback; sans checkpointing after every blob

* minor updates; pr review
  • Loading branch information
schmikei authored Jan 17, 2025
1 parent ad217b3 commit 9a225bf
Show file tree
Hide file tree
Showing 10 changed files with 580 additions and 306 deletions.
52 changes: 37 additions & 15 deletions receiver/azureblobrehydrationreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,32 @@ This is not a traditional receiver that continually produces data but rather reh
- Traces

## How it works
1. The receiver polls blob storage for all blobs in the specified container.
1. The receiver polls blob storage for pages of blobs in the specified container.
2. The receiver will parse each blob's path to determine if it matches a path created by the [Azure Blob Exporter](../../exporter/azureblobexporter/README.md#blob-path).
3. If the blob path is from the exporter, the receiver will parse the timestamp represented by the path.
4. If the timestamp is within the configured range the receiver will download the blob and parse its contents into OTLP data.

a. The receiver will process both uncompressed JSON blobs and blobs compressed with gzip.

> Note: There is no current way of specifying a time range to rehydrate so any blobs outside of the time range still need to be retrieved from the API in order to filter via the `starting_time` and `ending_time` configuration.
## Configuration
| Field | Type | Default | Required | Description |
|--------------------|-----------|------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| connection_string | string | | `true` | The connection string to the Azure Blob Storage account. Can be found under the `Access keys` section of your storage account. |
| container | string | | `true` | The name of the container to rehydrate from. |
| root_folder | string | | `false` | The root folder that prefixes the blob path. Should match the `root_folder` value of the Azure Blob Exporter. |
| starting_time | string | | `true ` | The UTC start time that represents the start of the time range to rehydrate from. Must be in the form `YYYY-MM-DDTHH:MM`. |
| ending_time | string | | `true ` | The UTC end time that represents the end of the time range to rehydrate from. Must be in the form `YYYY-MM-DDTHH:MM`. |
| delete_on_read | bool | `false` | `false ` | If `true` the blob will be deleted after being rehydrated. |
| poll_interval | string | `1m` | `false ` | How often to read a new set of blobs. This value is mostly to control how often the blob API is called to ensure once rehydration is done the receiver isn't making too many API calls. |
| poll_timeout | string | `30s` | `false ` | The timeout used when reading blobs from Azure. |
| storage | string | | `false ` | The component ID of a storage extension. The storage extension prevents duplication of data after a collector restart by remembering which blobs were previously rehydrated. |

| Field | Type | Default | Required | Description |
|-------|------|---------|----------|-------------|
| connection_string | string | | `true` | The connection string to the Azure Blob Storage account. Can be found under the `Access keys` section of your storage account. |
| container | string | | `true` | The name of the container to rehydrate from. |
| root_folder | string | | `false` | The root folder that prefixes the blob path. Should match the `root_folder` value of the Azure Blob Exporter. |
| starting_time | string | | `true` | The UTC start time that represents the start of the time range to rehydrate from. Must be in the form `YYYY-MM-DDTHH:MM`. |
| ending_time | string | | `true` | The UTC end time that represents the end of the time range to rehydrate from. Must be in the form `YYYY-MM-DDTHH:MM`. |
| delete_on_read | bool | `false` | `false` | If `true` the blob will be deleted after being rehydrated. |
| storage | string | | `false` | The component ID of a storage extension. The storage extension prevents duplication of data after a collector restart by remembering which blobs were previously rehydrated. |
| poll_interval* | string | | `false` | The interval at which the Azure API is scanned for blobs. |
| poll_timeout* | string | | `false` | The timeout for the Azure API to scan for blobs. |
| batch_size | int | `30` | `false` | The number of blobs to download and process in the pipeline simultaneously. This parameter directly impacts performance by controlling the concurrent blob download limit. |
| page_size | int | `1000` | `false` | The maximum number of blob information to request in a single API call. |

> Deprecated*: `poll_interval` and `poll_timeout` are no longer supported and `batch_size`/`page_size` should be used instead.
## Example Configuration

Expand All @@ -41,6 +48,7 @@ This configuration specifies a `connection_string`, `container`, `starting_time`
This will rehydrate all blobs in the container `my-container` that have a path that represents they were created between `1:00pm` and `2:30pm` UTC time on `October 1, 2023`.

Such a path could look like the following:

```
year=2023/month=10/day=01/hour=13/minute=30/metrics_12345.json
year=2023/month=10/day=01/hour=13/minute=30/logs_12345.json
Expand All @@ -53,25 +61,26 @@ azureblobrehydration:
container: "my-container"
starting_time: 2023-10-01T13:00
ending_time: 2023-10-01T14:30
batch_size: 100
page_size: 1000
```
### Using Storage Extension Configuration
This configuration shows using a storage extension to track rehydration progress over agent restarts. The `storage` field is set to the component ID of the storage extension.


```yaml
extensions:
file_storage:
directory: $OIQ_OTEL_COLLECTOR_HOME/storage
receivers:
azureblobrehydration:
connection_string: "DefaultEndpointsProtocol=https;AccountName=storage_account_name;AccountKey=storage_account_key;EndpointSuffix=core.windows.net"
container: "my-container"
starting_time: 2023-10-01T13:00
ending_time: 2023-10-01T14:30
storage: "file_storage"
batch_size: 100
page_size: 1000
```

### Root Folder Configuration
Expand All @@ -93,6 +102,8 @@ azureblobrehydration:
starting_time: 2023-10-01T13:00
ending_time: 2023-10-01T14:30
root_folder: "root"
batch_size: 100
page_size: 1000
```

### Delete on read Configuration
Expand All @@ -106,4 +117,15 @@ azureblobrehydration:
starting_time: 2023-10-01T13:00
ending_time: 2023-10-01T14:30
delete_on_read: true
batch_size: 100
page_size: 1000
```

## Deprecated Configuration

The following configuration fields are deprecated and will be removed in a future release.

| Field | Deprecated |
|-------|----------|
| poll_interval | true |
| poll_timeout | true |
23 changes: 17 additions & 6 deletions receiver/azureblobrehydrationreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ import (

// Config is the configuration for the azure blob rehydration receiver
type Config struct {
// BatchSize is the number of blobs to process entering the pipeline in a single batch. (default 30)
// This number directly affects the number of goroutines that will be created to process the blobs.
BatchSize int `mapstructure:"batch_size"`

// ConnectionString is the Azure Blob Storage connection key,
// which can be found in the Azure Blob Storage resource on the Azure Portal. (no default)
ConnectionString string `mapstructure:"connection_string"`
Expand All @@ -45,19 +49,30 @@ type Config struct {
// Default value of false
DeleteOnRead bool `mapstructure:"delete_on_read"`

// Deprecated: PollInterval is no longer required due to streaming blobs for processing.
// if a value is provided, validation will throw an error
// PollInterval is the interval at which the Azure API is scanned for blobs.
// Default value of 1m
PollInterval time.Duration `mapstructure:"poll_interval"`

// Deprecated: PollTimeout is no longer required due to streaming blobs for processing.
// if a value is provided, validation will throw an error
// PollTimeout is the timeout for the Azure API to scan for blobs.
PollTimeout time.Duration `mapstructure:"poll_timeout"`

// PageSize is the number of blobs to request from the Azure API at a time. (default 1000)
PageSize int `mapstructure:"page_size"`

// ID of the storage extension to use for storing progress
StorageID *component.ID `mapstructure:"storage"`
}

// Validate validates the config
func (c *Config) Validate() error {
if c.BatchSize < 1 {
return errors.New("batch_size must be greater than 0")
}

if c.ConnectionString == "" {
return errors.New("connection_string is required")
}
Expand All @@ -81,12 +96,8 @@ func (c *Config) Validate() error {
return errors.New("ending_time must be at least one minute after starting_time")
}

if c.PollInterval < time.Second {
return errors.New("poll_interval must be at least one second")
}

if c.PollTimeout < time.Second {
return errors.New("poll_timeout must be at least one second")
if c.PageSize < 1 {
return errors.New("page_size must be greater than 0")
}

return nil
Expand Down
64 changes: 39 additions & 25 deletions receiver/azureblobrehydrationreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package azureblobrehydrationreceiver //import "github.com/observiq/bindplane-ote
import (
"errors"
"testing"
"time"

"github.com/stretchr/testify/require"
)
Expand All @@ -37,8 +36,8 @@ func TestConfigValidate(t *testing.T) {
StartingTime: "2023-10-02T17:00",
EndingTime: "2023-10-02T17:01",
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 30,
PageSize: 1000,
},
expectErr: errors.New("connection_string is required"),
},
Expand All @@ -51,8 +50,8 @@ func TestConfigValidate(t *testing.T) {
StartingTime: "2023-10-02T17:00",
EndingTime: "2023-10-02T17:01",
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 30,
PageSize: 1000,
},
expectErr: errors.New("container is required"),
},
Expand All @@ -65,8 +64,8 @@ func TestConfigValidate(t *testing.T) {
StartingTime: "",
EndingTime: "2023-10-02T17:01",
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 30,
PageSize: 1000,
},
expectErr: errors.New("starting_time is invalid: missing value"),
},
Expand All @@ -79,8 +78,8 @@ func TestConfigValidate(t *testing.T) {
StartingTime: "2023-10-02T17:00",
EndingTime: "",
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 30,
PageSize: 1000,
},
expectErr: errors.New("ending_time is invalid: missing value"),
},
Expand All @@ -93,8 +92,8 @@ func TestConfigValidate(t *testing.T) {
StartingTime: "invalid_time",
EndingTime: "2023-10-02T17:01",
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 30,
PageSize: 1000,
},
expectErr: errors.New("starting_time is invalid: invalid timestamp"),
},
Expand All @@ -107,8 +106,8 @@ func TestConfigValidate(t *testing.T) {
StartingTime: "2023-10-02T17:00",
EndingTime: "invalid_time",
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 30,
PageSize: 1000,
},
expectErr: errors.New("ending_time is invalid: invalid timestamp"),
},
Expand All @@ -121,38 +120,53 @@ func TestConfigValidate(t *testing.T) {
StartingTime: "2023-10-02T17:00",
EndingTime: "2023-10-02T16:00",
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 30,
PageSize: 1000,
},
expectErr: errors.New("ending_time must be at least one minute after starting_time"),
},
{
desc: "Bad poll_interval",
desc: "with deprecated poll_interval",
cfg: &Config{
ConnectionString: "connection_string",
Container: "container",
RootFolder: "root",
StartingTime: "2023-10-02T17:00",
EndingTime: "2023-10-02T17:01",
DeleteOnRead: false,
PollInterval: time.Millisecond,
PollTimeout: time.Second * 10,
BatchSize: 30,
PageSize: 1000,
},
expectErr: errors.New("poll_interval must be at least one second"),
// expect no error until future release where poll_interval is removed
expectErr: nil,
},
{
desc: "Bad batch_size",
cfg: &Config{
ConnectionString: "connection_string",
Container: "container",
RootFolder: "root",
StartingTime: "2023-10-02T17:00",
EndingTime: "2023-10-02T17:01",
DeleteOnRead: false,
BatchSize: 0,
PageSize: 1000,
},
expectErr: errors.New("batch_size must be greater than 0"),
},
{
desc: "Bad poll_timeout",
desc: "Bad page_size",
cfg: &Config{
ConnectionString: "connection_string",
Container: "container",
RootFolder: "root",
StartingTime: "2023-10-02T17:00",
EndingTime: "2023-10-02T17:01",
DeleteOnRead: false,
PollInterval: time.Second * 2,
PollTimeout: time.Millisecond,
BatchSize: 30,
PageSize: 0,
},
expectErr: errors.New("poll_timeout must be at least one second"),
expectErr: errors.New("page_size must be greater than 0"),
},
{
desc: "Valid config",
Expand All @@ -163,8 +177,8 @@ func TestConfigValidate(t *testing.T) {
StartingTime: "2023-10-02T17:00",
EndingTime: "2023-10-02T17:01",
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 30,
PageSize: 1000,
},
expectErr: nil,
},
Expand Down
8 changes: 5 additions & 3 deletions receiver/azureblobrehydrationreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package azureblobrehydrationreceiver //import "github.com/observiq/bindplane-ote
import (
"context"
"errors"
"time"

"github.com/observiq/bindplane-otel-collector/receiver/azureblobrehydrationreceiver/internal/metadata"
"go.opentelemetry.io/collector/component"
Expand All @@ -28,6 +27,9 @@ import (
// errImproperCfgType error for when an invalid config type is passed to receiver creation funcs
var errImproperCfgType = errors.New("improper config type")

const defaultBatchSize = 30
const defaultPageSize = 1000

// NewFactory creates a new receiver factory
func NewFactory() receiver.Factory {
return receiver.NewFactory(
Expand All @@ -43,8 +45,8 @@ func NewFactory() receiver.Factory {
func createDefaultConfig() component.Config {
return &Config{
DeleteOnRead: false,
PollInterval: time.Minute,
PollTimeout: time.Second * 30,
BatchSize: defaultBatchSize,
PageSize: defaultPageSize,
}
}

Expand Down
5 changes: 2 additions & 3 deletions receiver/azureblobrehydrationreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@ package azureblobrehydrationreceiver //import "github.com/observiq/bindplane-ote

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func Test_createDefaultConfig(t *testing.T) {
expectedCfg := &Config{
DeleteOnRead: false,
PollInterval: time.Minute,
PollTimeout: time.Second * 30,
BatchSize: defaultBatchSize,
PageSize: defaultPageSize,
}

componentCfg := createDefaultConfig()
Expand Down
Loading

0 comments on commit 9a225bf

Please sign in to comment.