From fd0fecc56e3ace88cacbc54207e0814fc71a7753 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Sun, 12 Nov 2023 14:54:07 -0800 Subject: [PATCH] Use standard component Start/Shutdown func in exporterhelper Signed-off-by: Bogdan Drutu --- exporter/exporterhelper/common.go | 39 ++++++--------- .../internal/bounded_memory_queue_test.go | 6 +-- .../internal/persistent_queue.go | 11 +++-- .../internal/persistent_queue_test.go | 7 +-- exporter/exporterhelper/internal/queue.go | 2 - exporter/exporterhelper/queue_sender.go | 49 ++++++++++--------- exporter/exporterhelper/retry_sender.go | 2 +- 7 files changed, 53 insertions(+), 63 deletions(-) diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 06134aae2f5..9736ec59c2b 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -17,26 +17,19 @@ import ( // requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs). type requestSender interface { - start(ctx context.Context, host component.Host, set exporter.CreateSettings) error - shutdown(ctx context.Context) error + component.Component send(req internal.Request) error setNextSender(nextSender requestSender) } type baseRequestSender struct { + component.StartFunc + component.ShutdownFunc nextSender requestSender } var _ requestSender = (*baseRequestSender)(nil) -func (b *baseRequestSender) start(context.Context, component.Host, exporter.CreateSettings) error { - return nil -} - -func (b *baseRequestSender) shutdown(context.Context) error { - return nil -} - func (b *baseRequestSender) send(req internal.Request) error { return b.nextSender.send(req) } @@ -53,10 +46,7 @@ type errorLoggingRequestSender struct { func (l *errorLoggingRequestSender) send(req internal.Request) error { err := l.baseRequestSender.send(req) if err != nil { - l.logger.Error( - "Exporting failed", - zap.Error(err), - ) + l.logger.Error("Exporting failed", zap.Error(err)) } return err } @@ -135,10 +125,10 @@ func WithQueue(config QueueSettings) Option { if config.StorageID == nil { queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers) } else { - queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler) + queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler, o.set) } } - qs := newQueueSender(o.set.ID, o.signal, queue, o.set.Logger) + qs := newQueueSender(o.set, o.signal, queue) o.queueSender = qs o.setOnTemporaryFailure(qs.onTemporaryFailure) } @@ -231,18 +221,17 @@ func (be *baseExporter) Start(ctx context.Context, host component.Host) error { } // If no error then start the queueSender. - return be.queueSender.start(ctx, host, be.set) + return be.queueSender.Start(ctx, host) } func (be *baseExporter) Shutdown(ctx context.Context) error { - // First shutdown the retry sender, so it can push any pending requests to back the queue. - err := be.retrySender.shutdown(ctx) - - // Then shutdown the queue sender. - err = multierr.Append(err, be.queueSender.shutdown(ctx)) - - // Last shutdown the wrapped exporter itself. - return multierr.Append(err, be.ShutdownFunc.Shutdown(ctx)) + return multierr.Combine( + // First shutdown the retry sender, so it can push any pending requests to back the queue. + be.retrySender.Shutdown(ctx), + // Then shutdown the queue sender. + be.queueSender.Shutdown(ctx), + // Last shutdown the wrapped exporter itself. + be.ShutdownFunc.Shutdown(ctx)) } func (be *baseExporter) setOnTemporaryFailure(onTemporaryFailure onRequestHandlingFinishedFunc) { diff --git a/exporter/exporterhelper/internal/bounded_memory_queue_test.go b/exporter/exporterhelper/internal/bounded_memory_queue_test.go index 39c433658f1..881aa7a9f07 100644 --- a/exporter/exporterhelper/internal/bounded_memory_queue_test.go +++ b/exporter/exporterhelper/internal/bounded_memory_queue_test.go @@ -19,14 +19,12 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/exporter/exportertest" ) func newNopQueueSettings(callback func(item Request)) QueueSettings { return QueueSettings{ - CreateSettings: exportertest.NewNopCreateSettings(), - DataType: component.DataTypeMetrics, - Callback: callback, + DataType: component.DataTypeMetrics, + Callback: callback, } } diff --git a/exporter/exporterhelper/internal/persistent_queue.go b/exporter/exporterhelper/internal/persistent_queue.go index c40313f9c22..7ed72c5df3b 100644 --- a/exporter/exporterhelper/internal/persistent_queue.go +++ b/exporter/exporterhelper/internal/persistent_queue.go @@ -10,6 +10,7 @@ import ( "sync" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/extension/experimental/storage" ) @@ -29,6 +30,7 @@ var ( type persistentQueue struct { stopWG sync.WaitGroup stopChan chan struct{} + set exporter.CreateSettings storageID component.ID storage *persistentContiguousStorage capacity uint64 @@ -45,10 +47,11 @@ func buildPersistentStorageName(name string, signal component.DataType) string { // NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage func NewPersistentQueue(capacity int, numConsumers int, storageID component.ID, marshaler RequestMarshaler, - unmarshaler RequestUnmarshaler) Queue { + unmarshaler RequestUnmarshaler, set exporter.CreateSettings) Queue { return &persistentQueue{ capacity: uint64(capacity), numConsumers: numConsumers, + set: set, storageID: storageID, marshaler: marshaler, unmarshaler: unmarshaler, @@ -58,12 +61,12 @@ func NewPersistentQueue(capacity int, numConsumers int, storageID component.ID, // Start starts the persistentQueue with the given number of consumers. func (pq *persistentQueue) Start(ctx context.Context, host component.Host, set QueueSettings) error { - storageClient, err := toStorageClient(ctx, pq.storageID, host, set.ID, set.DataType) + storageClient, err := toStorageClient(ctx, pq.storageID, host, pq.set.ID, set.DataType) if err != nil { return err } - storageName := buildPersistentStorageName(set.ID.Name(), set.DataType) - pq.storage = newPersistentContiguousStorage(ctx, storageName, storageClient, set.Logger, pq.capacity, pq.marshaler, pq.unmarshaler) + storageName := buildPersistentStorageName(pq.set.ID.Name(), set.DataType) + pq.storage = newPersistentContiguousStorage(ctx, storageName, storageClient, pq.set.Logger, pq.capacity, pq.marshaler, pq.unmarshaler) for i := 0; i < pq.numConsumers; i++ { pq.stopWG.Add(1) go func() { diff --git a/exporter/exporterhelper/internal/persistent_queue_test.go b/exporter/exporterhelper/internal/persistent_queue_test.go index 25241c6a8f4..d8556a36ca1 100644 --- a/exporter/exporterhelper/internal/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/persistent_queue_test.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/extension/experimental/storage" "go.opentelemetry.io/collector/extension/extensiontest" "go.opentelemetry.io/collector/pdata/pcommon" @@ -34,7 +35,7 @@ func (nh *mockHost) GetExtensions() map[component.ID]component.Component { // createTestQueue creates and starts a fake queue with the given capacity and number of consumers. func createTestQueue(t *testing.T, capacity, numConsumers int, callback func(item Request)) Queue { pq := NewPersistentQueue(capacity, numConsumers, component.ID{}, newFakeTracesRequestMarshalerFunc(), - newFakeTracesRequestUnmarshalerFunc()) + newFakeTracesRequestUnmarshalerFunc(), exportertest.NewNopCreateSettings()) host := &mockHost{ext: map[component.ID]component.Component{ {}: NewMockStorageExtension(nil), }} @@ -45,7 +46,7 @@ func createTestQueue(t *testing.T, capacity, numConsumers int, callback func(ite func TestPersistentQueue_Capacity(t *testing.T) { pq := NewPersistentQueue(5, 1, component.ID{}, newFakeTracesRequestMarshalerFunc(), - newFakeTracesRequestUnmarshalerFunc()) + newFakeTracesRequestUnmarshalerFunc(), exportertest.NewNopCreateSettings()) host := &mockHost{ext: map[component.ID]component.Component{ {}: NewMockStorageExtension(nil), }} @@ -280,7 +281,7 @@ func TestInvalidStorageExtensionType(t *testing.T) { func TestPersistentQueue_StopAfterBadStart(t *testing.T) { pq := NewPersistentQueue(1, 1, component.ID{}, newFakeTracesRequestMarshalerFunc(), - newFakeTracesRequestUnmarshalerFunc()) + newFakeTracesRequestUnmarshalerFunc(), exportertest.NewNopCreateSettings()) // verify that stopping a un-start/started w/error queue does not panic assert.NoError(t, pq.Shutdown(context.Background())) } diff --git a/exporter/exporterhelper/internal/queue.go b/exporter/exporterhelper/internal/queue.go index 52dfdefc603..0ffc834a6f7 100644 --- a/exporter/exporterhelper/internal/queue.go +++ b/exporter/exporterhelper/internal/queue.go @@ -9,11 +9,9 @@ import ( "context" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/exporter" ) type QueueSettings struct { - exporter.CreateSettings DataType component.DataType Callback func(item Request) } diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index 0b11db6c4ec..13ff4aff09e 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -79,19 +79,21 @@ type queueSender struct { queue internal.Queue traceAttribute attribute.KeyValue logger *zap.Logger + meter otelmetric.Meter requeuingEnabled bool metricCapacity otelmetric.Int64ObservableGauge metricSize otelmetric.Int64ObservableGauge } -func newQueueSender(id component.ID, signal component.DataType, queue internal.Queue, logger *zap.Logger) *queueSender { +func newQueueSender(set exporter.CreateSettings, signal component.DataType, queue internal.Queue) *queueSender { return &queueSender{ - fullName: id.String(), + fullName: set.ID.String(), signal: signal, queue: queue, - traceAttribute: attribute.String(obsmetrics.ExporterKey, id.String()), - logger: logger, + traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()), + logger: set.TelemetrySettings.Logger, + meter: set.TelemetrySettings.MeterProvider.Meter(scopeName), // TODO: this can be further exposed as a config param rather than relying on a type of queue requeuingEnabled: queue != nil && queue.IsPersistent(), } @@ -122,15 +124,14 @@ func (qs *queueSender) onTemporaryFailure(logger *zap.Logger, req internal.Reque return err } -// start is invoked during service startup. -func (qs *queueSender) start(ctx context.Context, host component.Host, set exporter.CreateSettings) error { +// Start is invoked during service startup. +func (qs *queueSender) Start(ctx context.Context, host component.Host) error { if qs.queue == nil { return nil } err := qs.queue.Start(ctx, host, internal.QueueSettings{ - CreateSettings: set, - DataType: qs.signal, + DataType: qs.signal, Callback: func(item internal.Request) { _ = qs.nextSender.send(item) item.OnProcessingFinished() @@ -141,17 +142,17 @@ func (qs *queueSender) start(ctx context.Context, host component.Host, set expor } if obsreportconfig.UseOtelForInternalMetricsfeatureGate.IsEnabled() { - return qs.recordWithOtel(set.MeterProvider.Meter(scopeName)) + return qs.recordWithOtel() } return qs.recordWithOC() } -func (qs *queueSender) recordWithOtel(meter otelmetric.Meter) error { +func (qs *queueSender) recordWithOtel() error { var err, errs error attrs := otelmetric.WithAttributeSet(attribute.NewSet(attribute.String(obsmetrics.ExporterKey, qs.fullName))) - qs.metricSize, err = meter.Int64ObservableGauge( + qs.metricSize, err = qs.meter.Int64ObservableGauge( obsmetrics.ExporterKey+"/queue_size", otelmetric.WithDescription("Current size of the retry queue (in batches)"), otelmetric.WithUnit("1"), @@ -162,7 +163,7 @@ func (qs *queueSender) recordWithOtel(meter otelmetric.Meter) error { ) errs = multierr.Append(errs, err) - qs.metricCapacity, err = meter.Int64ObservableGauge( + qs.metricCapacity, err = qs.meter.Int64ObservableGauge( obsmetrics.ExporterKey+"/queue_capacity", otelmetric.WithDescription("Fixed capacity of the retry queue (in batches)"), otelmetric.WithUnit("1"), @@ -193,19 +194,19 @@ func (qs *queueSender) recordWithOC() error { return nil } -// shutdown is invoked during service shutdown. -func (qs *queueSender) shutdown(ctx context.Context) error { - if qs.queue != nil { - // Cleanup queue metrics reporting - _ = globalInstruments.queueSize.UpsertEntry(func() int64 { - return int64(0) - }, metricdata.NewLabelValue(qs.fullName)) - - // Stop the queued sender, this will drain the queue and will call the retry (which is stopped) that will only - // try once every request. - return qs.queue.Shutdown(ctx) +// Shutdown is invoked during service shutdown. +func (qs *queueSender) Shutdown(ctx context.Context) error { + if qs.queue == nil { + return nil } - return nil + // Cleanup queue metrics reporting + _ = globalInstruments.queueSize.UpsertEntry(func() int64 { + return int64(0) + }, metricdata.NewLabelValue(qs.fullName)) + + // Stop the queued sender, this will drain the queue and will call the retry (which is stopped) that will only + // try once every request. + return qs.queue.Shutdown(ctx) } // send implements the requestSender interface diff --git a/exporter/exporterhelper/retry_sender.go b/exporter/exporterhelper/retry_sender.go index 51828ba1818..72c95f94db2 100644 --- a/exporter/exporterhelper/retry_sender.go +++ b/exporter/exporterhelper/retry_sender.go @@ -100,7 +100,7 @@ func newRetrySender(id component.ID, rCfg RetrySettings, logger *zap.Logger, onT } } -func (rs *retrySender) shutdown(context.Context) error { +func (rs *retrySender) Shutdown(context.Context) error { close(rs.stopCh) return nil }