diff --git a/.chloggen/exporter-shutdown-memory-leak.yaml b/.chloggen/exporter-shutdown-memory-leak.yaml new file mode 100644 index 00000000000..3e432303c0a --- /dev/null +++ b/.chloggen/exporter-shutdown-memory-leak.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix memory leak at exporter shutdown + +# One or more tracking issues or pull requests related to the change +issues: [11401] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/cmd/mdatagen/internal/samplereceiver/factory.go b/cmd/mdatagen/internal/samplereceiver/factory.go index 7a3c9bb170d..e7445f9cb29 100644 --- a/cmd/mdatagen/internal/samplereceiver/factory.go +++ b/cmd/mdatagen/internal/samplereceiver/factory.go @@ -48,5 +48,5 @@ type nopReceiver struct { } func (r nopReceiver) initOptionalMetric() { - _ = r.telemetryBuilder.InitQueueLength(func() int64 { return 1 }) + _, _ = r.telemetryBuilder.InitQueueLength(func() int64 { return 1 }) } diff --git a/cmd/mdatagen/internal/samplereceiver/internal/metadata/generated_telemetry.go b/cmd/mdatagen/internal/samplereceiver/internal/metadata/generated_telemetry.go index 71d4e1e78d6..99c7017b575 100644 --- a/cmd/mdatagen/internal/samplereceiver/internal/metadata/generated_telemetry.go +++ b/cmd/mdatagen/internal/samplereceiver/internal/metadata/generated_telemetry.go @@ -55,7 +55,7 @@ func WithProcessRuntimeTotalAllocBytesCallback(cb func() int64, opts ...metric.O } // InitQueueLength configures the QueueLength metric. -func (builder *TelemetryBuilder) InitQueueLength(cb func() int64, opts ...metric.ObserveOption) error { +func (builder *TelemetryBuilder) InitQueueLength(cb func() int64, opts ...metric.ObserveOption) (metric.Registration, error) { var err error builder.QueueLength, err = builder.meter.Int64ObservableGauge( "otelcol_queue_length", @@ -63,13 +63,13 @@ func (builder *TelemetryBuilder) InitQueueLength(cb func() int64, opts ...metric metric.WithUnit("{items}"), ) if err != nil { - return err + return nil, err } - _, err = builder.meter.RegisterCallback(func(_ context.Context, o metric.Observer) error { + reg, err := builder.meter.RegisterCallback(func(_ context.Context, o metric.Observer) error { o.ObserveInt64(builder.QueueLength, cb(), opts...) return nil }, builder.QueueLength) - return err + return reg, err } // NewTelemetryBuilder provides a struct with methods to update all internal telemetry diff --git a/cmd/mdatagen/internal/templates/telemetry.go.tmpl b/cmd/mdatagen/internal/templates/telemetry.go.tmpl index 8b6c6e7d1dc..3f365e45167 100644 --- a/cmd/mdatagen/internal/templates/telemetry.go.tmpl +++ b/cmd/mdatagen/internal/templates/telemetry.go.tmpl @@ -56,7 +56,7 @@ func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) { {{- range $name, $metric := .Telemetry.Metrics }} {{- if $metric.Optional }} // Init{{ $name.Render }} configures the {{ $name.Render }} metric. -func (builder *TelemetryBuilder) Init{{ $name.Render }}({{ if $metric.Data.Async -}}cb func() {{ $metric.Data.BasicType }}{{- end }}, opts ...metric.ObserveOption) error { +func (builder *TelemetryBuilder) Init{{ $name.Render }}({{ if $metric.Data.Async -}}cb func() {{ $metric.Data.BasicType }}{{- end }}, opts ...metric.ObserveOption) (metric.Registration, error) { var err error builder.{{ $name.Render }}, err = builder.meter.{{ $metric.Data.Instrument }}( "otelcol_{{ $name }}", @@ -68,14 +68,14 @@ func (builder *TelemetryBuilder) Init{{ $name.Render }}({{ if $metric.Data.Async ) {{- if $metric.Data.Async }} if err != nil { - return err + return nil, err } - _, err = builder.meter.RegisterCallback(func(_ context.Context, o metric.Observer) error { + reg, err := builder.meter.RegisterCallback(func(_ context.Context, o metric.Observer) error { o.Observe{{ casesTitle $metric.Data.BasicType }}(builder.{{ $name.Render }}, cb(), opts...) return nil }, builder.{{ $name.Render }}) {{- end }} - return err + return reg, err } {{- else }} diff --git a/exporter/exporterhelper/internal/metadata/generated_telemetry.go b/exporter/exporterhelper/internal/metadata/generated_telemetry.go index 9522b231122..41bea2f8ce1 100644 --- a/exporter/exporterhelper/internal/metadata/generated_telemetry.go +++ b/exporter/exporterhelper/internal/metadata/generated_telemetry.go @@ -51,7 +51,7 @@ func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) { } // InitExporterQueueCapacity configures the ExporterQueueCapacity metric. -func (builder *TelemetryBuilder) InitExporterQueueCapacity(cb func() int64, opts ...metric.ObserveOption) error { +func (builder *TelemetryBuilder) InitExporterQueueCapacity(cb func() int64, opts ...metric.ObserveOption) (metric.Registration, error) { var err error builder.ExporterQueueCapacity, err = builder.meter.Int64ObservableGauge( "otelcol_exporter_queue_capacity", @@ -59,17 +59,17 @@ func (builder *TelemetryBuilder) InitExporterQueueCapacity(cb func() int64, opts metric.WithUnit("{batches}"), ) if err != nil { - return err + return nil, err } - _, err = builder.meter.RegisterCallback(func(_ context.Context, o metric.Observer) error { + reg, err := builder.meter.RegisterCallback(func(_ context.Context, o metric.Observer) error { o.ObserveInt64(builder.ExporterQueueCapacity, cb(), opts...) return nil }, builder.ExporterQueueCapacity) - return err + return reg, err } // InitExporterQueueSize configures the ExporterQueueSize metric. -func (builder *TelemetryBuilder) InitExporterQueueSize(cb func() int64, opts ...metric.ObserveOption) error { +func (builder *TelemetryBuilder) InitExporterQueueSize(cb func() int64, opts ...metric.ObserveOption) (metric.Registration, error) { var err error builder.ExporterQueueSize, err = builder.meter.Int64ObservableGauge( "otelcol_exporter_queue_size", @@ -77,13 +77,13 @@ func (builder *TelemetryBuilder) InitExporterQueueSize(cb func() int64, opts ... metric.WithUnit("{batches}"), ) if err != nil { - return err + return nil, err } - _, err = builder.meter.RegisterCallback(func(_ context.Context, o metric.Observer) error { + reg, err := builder.meter.RegisterCallback(func(_ context.Context, o metric.Observer) error { o.ObserveInt64(builder.ExporterQueueSize, cb(), opts...) return nil }, builder.ExporterQueueSize) - return err + return reg, err } // NewTelemetryBuilder provides a struct with methods to update all internal telemetry diff --git a/exporter/exporterhelper/internal/queue_sender.go b/exporter/exporterhelper/internal/queue_sender.go index bc4273cd620..728ad8547ef 100644 --- a/exporter/exporterhelper/internal/queue_sender.go +++ b/exporter/exporterhelper/internal/queue_sender.go @@ -10,7 +10,6 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" - "go.uber.org/multierr" "go.uber.org/zap" "go.opentelemetry.io/collector/component" @@ -75,8 +74,10 @@ type QueueSender struct { batcher queue.Batcher consumers *queue.Consumers[internal.Request] - obsrep *ObsReport - exporterID component.ID + obsrep *ObsReport + exporterID component.ID + logger *zap.Logger + shutdownFns []component.ShutdownFunc } func NewQueueSender( @@ -92,6 +93,7 @@ func NewQueueSender( traceAttribute: attribute.String(ExporterKey, set.ID.String()), obsrep: obsrep, exporterID: set.ID, + logger: set.Logger, } exportFunc := func(ctx context.Context, req internal.Request) error { @@ -127,18 +129,41 @@ func (qs *QueueSender) Start(ctx context.Context, host component.Host) error { } dataTypeAttr := attribute.String(DataTypeKey, qs.obsrep.Signal.String()) - return multierr.Append( - qs.obsrep.TelemetryBuilder.InitExporterQueueSize(func() int64 { return int64(qs.queue.Size()) }, - metric.WithAttributeSet(attribute.NewSet(qs.traceAttribute, dataTypeAttr))), - qs.obsrep.TelemetryBuilder.InitExporterQueueCapacity(func() int64 { return int64(qs.queue.Capacity()) }, - metric.WithAttributeSet(attribute.NewSet(qs.traceAttribute))), - ) + + reg1, err1 := qs.obsrep.TelemetryBuilder.InitExporterQueueSize(func() int64 { return int64(qs.queue.Size()) }, + metric.WithAttributeSet(attribute.NewSet(qs.traceAttribute, dataTypeAttr))) + + if reg1 != nil { + qs.shutdownFns = append(qs.shutdownFns, func(context.Context) error { + return reg1.Unregister() + }) + } + + reg2, err2 := qs.obsrep.TelemetryBuilder.InitExporterQueueCapacity(func() int64 { return int64(qs.queue.Capacity()) }, + metric.WithAttributeSet(attribute.NewSet(qs.traceAttribute))) + + if reg2 != nil { + qs.shutdownFns = append(qs.shutdownFns, func(context.Context) error { + return reg2.Unregister() + }) + } + + return errors.Join(err1, err2) } // Shutdown is invoked during service shutdown. func (qs *QueueSender) Shutdown(ctx context.Context) error { // Stop the queue and consumers, this will drain the queue and will call the retry (which is stopped) that will only // try once every request. + + for _, fn := range qs.shutdownFns { + err := fn(ctx) + if err != nil { + qs.logger.Warn("Error while shutting down QueueSender", zap.Error(err)) + } + } + qs.shutdownFns = nil + if err := qs.queue.Shutdown(ctx); err != nil { return err } diff --git a/exporter/exporterhelper/internal/queue_sender_test.go b/exporter/exporterhelper/internal/queue_sender_test.go index 41dbbdbc38d..112b9bcc4b6 100644 --- a/exporter/exporterhelper/internal/queue_sender_test.go +++ b/exporter/exporterhelper/internal/queue_sender_test.go @@ -269,6 +269,10 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) { attribute.String(DataTypeKey, dataType.String()))) assert.NoError(t, be.Shutdown(context.Background())) + // metrics should be unregistered at shutdown to prevent memory leak + require.Error(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_capacity", int64(defaultQueueSize))) + require.Error(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_size", int64(7), + attribute.String(DataTypeKey, dataType.String()))) } }) }