Skip to content

Commit

Permalink
Add blocking option to control queue behavior when full
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Jan 14, 2025
1 parent 71aae79 commit cc2374a
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 5 deletions.
25 changes: 25 additions & 0 deletions .chloggen/add-blocking-option.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add blocking option to control queue behavior when full

# One or more tracking issues or pull requests related to the change
issues: [12090]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
1 change: 1 addition & 0 deletions exporter/exporterhelper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The following configuration options can be modified:
- `enabled` (default = true)
- `num_consumers` (default = 10): Number of consumers that dequeue batches; ignored if `enabled` is `false`
- `queue_size` (default = 1000): Maximum number of batches kept in memory before dropping; ignored if `enabled` is `false`
- `blocking` (default = false): If true blocks until queue has space for the request otherwise returns immediately; ignored if `enabled` is `false`
User should calculate this as `num_seconds * requests_per_second / requests_per_batch` where:
- `num_seconds` is the number of seconds to buffer in case of a backend outage
- `requests_per_second` is the average number of requests per seconds
Expand Down
1 change: 1 addition & 0 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ func WithQueue(config QueueConfig) Option {
Enabled: config.Enabled,
NumConsumers: config.NumConsumers,
QueueSize: config.QueueSize,
Blocking: config.Blocking,
}
o.queueFactory = exporterqueue.NewPersistentQueueFactory[internal.Request](config.StorageID, exporterqueue.PersistentQueueSettings[internal.Request]{
Marshaler: o.Marshaler,
Expand Down
8 changes: 5 additions & 3 deletions exporter/exporterhelper/internal/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"go.opentelemetry.io/collector/exporter/internal/queue"
)

const defaultQueueSize = 1000

// QueueConfig defines configuration for queueing batches before sending to the consumerSender.
type QueueConfig struct {
// Enabled indicates whether to not enqueue batches before sending to the consumerSender.
Expand All @@ -32,6 +30,9 @@ type QueueConfig struct {
NumConsumers int `mapstructure:"num_consumers"`
// QueueSize is the maximum number of batches allowed in queue at a given time.
QueueSize int `mapstructure:"queue_size"`
// Blocking controls the queue behavior when full.
// If true it blocks until enough space to add the new request to the queue.
Blocking bool `mapstructure:"blocking"`
// StorageID if not empty, enables the persistent storage and uses the component specified
// as a storage extension for the persistent queue
StorageID *component.ID `mapstructure:"storage"`
Expand All @@ -45,7 +46,8 @@ func NewDefaultQueueConfig() QueueConfig {
// By default, batches are 8192 spans, for a total of up to 8 million spans in the queue
// This can be estimated at 1-4 GB worth of maximum memory usage
// This default is probably still too high, and may be adjusted further down in a future release
QueueSize: defaultQueueSize,
QueueSize: 1_000,
Blocking: false,
}
}

Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/internal/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) {
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))

require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_capacity", int64(defaultQueueSize)))
require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_capacity", int64(1000)))

for i := 0; i < 7; i++ {
require.NoError(t, be.Send(context.Background(), newErrorRequest()))
Expand All @@ -271,7 +271,7 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) {

assert.NoError(t, be.Shutdown(context.Background()))
// metrics should be unregistered at shutdown to prevent memory leak
require.Error(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_capacity", int64(defaultQueueSize)))
require.Error(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_capacity", int64(1000)))
require.Error(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_size", int64(7),
attribute.String(DataTypeKey, dataType.String())))
}
Expand Down
4 changes: 4 additions & 0 deletions exporter/exporterqueue/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ type Config struct {
NumConsumers int `mapstructure:"num_consumers"`
// QueueSize is the maximum number of requests allowed in queue at any given time.
QueueSize int `mapstructure:"queue_size"`
// Blocking controls the queue behavior when full.
// If true it blocks until enough space to add the new request to the queue.
Blocking bool `mapstructure:"blocking"`
}

// NewDefaultConfig returns the default Config.
Expand All @@ -30,6 +33,7 @@ func NewDefaultConfig() Config {
Enabled: true,
NumConsumers: 10,
QueueSize: 1_000,
Blocking: true,
}
}

Expand Down
2 changes: 2 additions & 0 deletions exporter/exporterqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func NewMemoryQueueFactory[T any]() Factory[T] {
return newBoundedMemoryQueue[T](memoryQueueSettings[T]{
sizer: &requestSizer[T]{},
capacity: int64(cfg.QueueSize),
blocking: cfg.Blocking,
})
}
}
Expand All @@ -95,6 +96,7 @@ func NewPersistentQueueFactory[T any](storageID *component.ID, factorySettings P
return newPersistentQueue[T](persistentQueueSettings[T]{
sizer: &requestSizer[T]{},
capacity: int64(cfg.QueueSize),
blocking: cfg.Blocking,
signal: set.Signal,
storageID: *storageID,
marshaler: factorySettings.Marshaler,
Expand Down

0 comments on commit cc2374a

Please sign in to comment.