diff --git a/.chloggen/add-blocking-option.yaml b/.chloggen/add-blocking-option.yaml new file mode 100644 index 00000000000..fa2bd7ddc59 --- /dev/null +++ b/.chloggen/add-blocking-option.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add blocking option to control queue behavior when full + +# One or more tracking issues or pull requests related to the change +issues: [12090] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/exporterhelper/README.md b/exporter/exporterhelper/README.md index 0020c25c1d9..e62c9d4c419 100644 --- a/exporter/exporterhelper/README.md +++ b/exporter/exporterhelper/README.md @@ -17,6 +17,7 @@ The following configuration options can be modified: - `enabled` (default = true) - `num_consumers` (default = 10): Number of consumers that dequeue batches; ignored if `enabled` is `false` - `queue_size` (default = 1000): Maximum number of batches kept in memory before dropping; ignored if `enabled` is `false` + - `blocking` (default = false): If true blocks until queue has space for the request otherwise returns immediately; ignored if `enabled` is `false` User should calculate this as `num_seconds * requests_per_second / requests_per_batch` where: - `num_seconds` is the number of seconds to buffer in case of a backend outage - `requests_per_second` is the average number of requests per seconds diff --git a/exporter/exporterhelper/internal/base_exporter.go b/exporter/exporterhelper/internal/base_exporter.go index 416d11264c0..da68c37092c 100644 --- a/exporter/exporterhelper/internal/base_exporter.go +++ b/exporter/exporterhelper/internal/base_exporter.go @@ -222,6 +222,7 @@ func WithQueue(config QueueConfig) Option { Enabled: config.Enabled, NumConsumers: config.NumConsumers, QueueSize: config.QueueSize, + Blocking: config.Blocking, } o.queueFactory = exporterqueue.NewPersistentQueueFactory[internal.Request](config.StorageID, exporterqueue.PersistentQueueSettings[internal.Request]{ Marshaler: o.Marshaler, diff --git a/exporter/exporterhelper/internal/queue_sender.go b/exporter/exporterhelper/internal/queue_sender.go index c1cf3848641..66628e6a357 100644 --- a/exporter/exporterhelper/internal/queue_sender.go +++ b/exporter/exporterhelper/internal/queue_sender.go @@ -20,8 +20,6 @@ import ( "go.opentelemetry.io/collector/exporter/internal/queue" ) -const defaultQueueSize = 1000 - // QueueConfig defines configuration for queueing batches before sending to the consumerSender. type QueueConfig struct { // Enabled indicates whether to not enqueue batches before sending to the consumerSender. @@ -32,6 +30,9 @@ type QueueConfig struct { NumConsumers int `mapstructure:"num_consumers"` // QueueSize is the maximum number of batches allowed in queue at a given time. QueueSize int `mapstructure:"queue_size"` + // Blocking controls the queue behavior when full. + // If true it blocks until enough space to add the new request to the queue. + Blocking bool `mapstructure:"blocking"` // StorageID if not empty, enables the persistent storage and uses the component specified // as a storage extension for the persistent queue StorageID *component.ID `mapstructure:"storage"` @@ -45,7 +46,8 @@ func NewDefaultQueueConfig() QueueConfig { // By default, batches are 8192 spans, for a total of up to 8 million spans in the queue // This can be estimated at 1-4 GB worth of maximum memory usage // This default is probably still too high, and may be adjusted further down in a future release - QueueSize: defaultQueueSize, + QueueSize: 1_000, + Blocking: false, } } diff --git a/exporter/exporterhelper/internal/queue_sender_test.go b/exporter/exporterhelper/internal/queue_sender_test.go index 10959a0ef83..695f83b0514 100644 --- a/exporter/exporterhelper/internal/queue_sender_test.go +++ b/exporter/exporterhelper/internal/queue_sender_test.go @@ -261,7 +261,7 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) { require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_capacity", int64(defaultQueueSize))) + require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_capacity", int64(1000))) for i := 0; i < 7; i++ { require.NoError(t, be.Send(context.Background(), newErrorRequest())) @@ -271,7 +271,7 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) { assert.NoError(t, be.Shutdown(context.Background())) // metrics should be unregistered at shutdown to prevent memory leak - require.Error(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_capacity", int64(defaultQueueSize))) + require.Error(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_capacity", int64(1000))) require.Error(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_size", int64(7), attribute.String(DataTypeKey, dataType.String()))) } diff --git a/exporter/exporterqueue/config.go b/exporter/exporterqueue/config.go index 538c08b08d8..f68599a0d17 100644 --- a/exporter/exporterqueue/config.go +++ b/exporter/exporterqueue/config.go @@ -20,6 +20,9 @@ type Config struct { NumConsumers int `mapstructure:"num_consumers"` // QueueSize is the maximum number of requests allowed in queue at any given time. QueueSize int `mapstructure:"queue_size"` + // Blocking controls the queue behavior when full. + // If true it blocks until enough space to add the new request to the queue. + Blocking bool `mapstructure:"blocking"` } // NewDefaultConfig returns the default Config. @@ -30,6 +33,7 @@ func NewDefaultConfig() Config { Enabled: true, NumConsumers: 10, QueueSize: 1_000, + Blocking: true, } } diff --git a/exporter/exporterqueue/queue.go b/exporter/exporterqueue/queue.go index 7e995a464ee..549f1ce7635 100644 --- a/exporter/exporterqueue/queue.go +++ b/exporter/exporterqueue/queue.go @@ -69,6 +69,7 @@ func NewMemoryQueueFactory[T any]() Factory[T] { return newBoundedMemoryQueue[T](memoryQueueSettings[T]{ sizer: &requestSizer[T]{}, capacity: int64(cfg.QueueSize), + blocking: cfg.Blocking, }) } } @@ -95,6 +96,7 @@ func NewPersistentQueueFactory[T any](storageID *component.ID, factorySettings P return newPersistentQueue[T](persistentQueueSettings[T]{ sizer: &requestSizer[T]{}, capacity: int64(cfg.QueueSize), + blocking: cfg.Blocking, signal: set.Signal, storageID: *storageID, marshaler: factorySettings.Marshaler,