Skip to content

Commit

Permalink
Remove memory leak at exporter shutdown (open-telemetry#11745)
Browse files Browse the repository at this point in the history
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

Fix memory leak at exporter shutdown. At startup time the exporter
creates metric callbacks that hold references to the exporter queue,
these need to be unregistered at shutdown.

<!-- Issue number if applicable -->
#### Link to tracking issue
Fixes open-telemetry#11401

---------

Co-authored-by: Alex Boten <223565+codeboten@users.noreply.github.com>
  • Loading branch information
madaraszg-tulip and codeboten authored Dec 6, 2024
1 parent 52d8a1a commit 71f7d9e
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 26 deletions.
25 changes: 25 additions & 0 deletions .chloggen/exporter-shutdown-memory-leak.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix memory leak at exporter shutdown

# One or more tracking issues or pull requests related to the change
issues: [11401]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
2 changes: 1 addition & 1 deletion cmd/mdatagen/internal/samplereceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,5 @@ type nopReceiver struct {
}

func (r nopReceiver) initOptionalMetric() {
_ = r.telemetryBuilder.InitQueueLength(func() int64 { return 1 })
_, _ = r.telemetryBuilder.InitQueueLength(func() int64 { return 1 })
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions cmd/mdatagen/internal/templates/telemetry.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) {
{{- range $name, $metric := .Telemetry.Metrics }}
{{- if $metric.Optional }}
// Init{{ $name.Render }} configures the {{ $name.Render }} metric.
func (builder *TelemetryBuilder) Init{{ $name.Render }}({{ if $metric.Data.Async -}}cb func() {{ $metric.Data.BasicType }}{{- end }}, opts ...metric.ObserveOption) error {
func (builder *TelemetryBuilder) Init{{ $name.Render }}({{ if $metric.Data.Async -}}cb func() {{ $metric.Data.BasicType }}{{- end }}, opts ...metric.ObserveOption) (metric.Registration, error) {
var err error
builder.{{ $name.Render }}, err = builder.meter.{{ $metric.Data.Instrument }}(
"otelcol_{{ $name }}",
Expand All @@ -68,14 +68,14 @@ func (builder *TelemetryBuilder) Init{{ $name.Render }}({{ if $metric.Data.Async
)
{{- if $metric.Data.Async }}
if err != nil {
return err
return nil, err
}
_, err = builder.meter.RegisterCallback(func(_ context.Context, o metric.Observer) error {
reg, err := builder.meter.RegisterCallback(func(_ context.Context, o metric.Observer) error {
o.Observe{{ casesTitle $metric.Data.BasicType }}(builder.{{ $name.Render }}, cb(), opts...)
return nil
}, builder.{{ $name.Render }})
{{- end }}
return err
return reg, err
}

{{- else }}
Expand Down
16 changes: 8 additions & 8 deletions exporter/exporterhelper/internal/metadata/generated_telemetry.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 34 additions & 9 deletions exporter/exporterhelper/internal/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/multierr"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -75,8 +74,10 @@ type QueueSender struct {
batcher queue.Batcher
consumers *queue.Consumers[internal.Request]

obsrep *ObsReport
exporterID component.ID
obsrep *ObsReport
exporterID component.ID
logger *zap.Logger
shutdownFns []component.ShutdownFunc
}

func NewQueueSender(
Expand All @@ -92,6 +93,7 @@ func NewQueueSender(
traceAttribute: attribute.String(ExporterKey, set.ID.String()),
obsrep: obsrep,
exporterID: set.ID,
logger: set.Logger,
}

exportFunc := func(ctx context.Context, req internal.Request) error {
Expand Down Expand Up @@ -127,18 +129,41 @@ func (qs *QueueSender) Start(ctx context.Context, host component.Host) error {
}

dataTypeAttr := attribute.String(DataTypeKey, qs.obsrep.Signal.String())
return multierr.Append(
qs.obsrep.TelemetryBuilder.InitExporterQueueSize(func() int64 { return int64(qs.queue.Size()) },
metric.WithAttributeSet(attribute.NewSet(qs.traceAttribute, dataTypeAttr))),
qs.obsrep.TelemetryBuilder.InitExporterQueueCapacity(func() int64 { return int64(qs.queue.Capacity()) },
metric.WithAttributeSet(attribute.NewSet(qs.traceAttribute))),
)

reg1, err1 := qs.obsrep.TelemetryBuilder.InitExporterQueueSize(func() int64 { return int64(qs.queue.Size()) },
metric.WithAttributeSet(attribute.NewSet(qs.traceAttribute, dataTypeAttr)))

if reg1 != nil {
qs.shutdownFns = append(qs.shutdownFns, func(context.Context) error {
return reg1.Unregister()
})
}

reg2, err2 := qs.obsrep.TelemetryBuilder.InitExporterQueueCapacity(func() int64 { return int64(qs.queue.Capacity()) },
metric.WithAttributeSet(attribute.NewSet(qs.traceAttribute)))

if reg2 != nil {
qs.shutdownFns = append(qs.shutdownFns, func(context.Context) error {
return reg2.Unregister()
})
}

return errors.Join(err1, err2)
}

// Shutdown is invoked during service shutdown.
func (qs *QueueSender) Shutdown(ctx context.Context) error {
// Stop the queue and consumers, this will drain the queue and will call the retry (which is stopped) that will only
// try once every request.

for _, fn := range qs.shutdownFns {
err := fn(ctx)
if err != nil {
qs.logger.Warn("Error while shutting down QueueSender", zap.Error(err))
}
}
qs.shutdownFns = nil

if err := qs.queue.Shutdown(ctx); err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions exporter/exporterhelper/internal/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,10 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) {
attribute.String(DataTypeKey, dataType.String())))

assert.NoError(t, be.Shutdown(context.Background()))
// metrics should be unregistered at shutdown to prevent memory leak
require.Error(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_capacity", int64(defaultQueueSize)))
require.Error(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_size", int64(7),
attribute.String(DataTypeKey, dataType.String())))
}
})
}
Expand Down

0 comments on commit 71f7d9e

Please sign in to comment.