Skip to content

Commit

Permalink
Use standard component Start/Shutdown func in exporterhelper (open-te…
Browse files Browse the repository at this point in the history
…lemetry#8861)

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored Nov 13, 2023
1 parent 7f09297 commit 23500dd
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 63 deletions.
39 changes: 14 additions & 25 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,19 @@ import (

// requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs).
type requestSender interface {
start(ctx context.Context, host component.Host, set exporter.CreateSettings) error
shutdown(ctx context.Context) error
component.Component
send(req internal.Request) error
setNextSender(nextSender requestSender)
}

type baseRequestSender struct {
component.StartFunc
component.ShutdownFunc
nextSender requestSender
}

var _ requestSender = (*baseRequestSender)(nil)

func (b *baseRequestSender) start(context.Context, component.Host, exporter.CreateSettings) error {
return nil
}

func (b *baseRequestSender) shutdown(context.Context) error {
return nil
}

func (b *baseRequestSender) send(req internal.Request) error {
return b.nextSender.send(req)
}
Expand All @@ -53,10 +46,7 @@ type errorLoggingRequestSender struct {
func (l *errorLoggingRequestSender) send(req internal.Request) error {
err := l.baseRequestSender.send(req)
if err != nil {
l.logger.Error(
"Exporting failed",
zap.Error(err),
)
l.logger.Error("Exporting failed", zap.Error(err))
}
return err
}
Expand Down Expand Up @@ -135,10 +125,10 @@ func WithQueue(config QueueSettings) Option {
if config.StorageID == nil {
queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers)
} else {
queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler)
queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler, o.set)
}
}
qs := newQueueSender(o.set.ID, o.signal, queue, o.set.Logger)
qs := newQueueSender(o.set, o.signal, queue)
o.queueSender = qs
o.setOnTemporaryFailure(qs.onTemporaryFailure)
}
Expand Down Expand Up @@ -231,18 +221,17 @@ func (be *baseExporter) Start(ctx context.Context, host component.Host) error {
}

// If no error then start the queueSender.
return be.queueSender.start(ctx, host, be.set)
return be.queueSender.Start(ctx, host)
}

func (be *baseExporter) Shutdown(ctx context.Context) error {
// First shutdown the retry sender, so it can push any pending requests to back the queue.
err := be.retrySender.shutdown(ctx)

// Then shutdown the queue sender.
err = multierr.Append(err, be.queueSender.shutdown(ctx))

// Last shutdown the wrapped exporter itself.
return multierr.Append(err, be.ShutdownFunc.Shutdown(ctx))
return multierr.Combine(
// First shutdown the retry sender, so it can push any pending requests to back the queue.
be.retrySender.Shutdown(ctx),
// Then shutdown the queue sender.
be.queueSender.Shutdown(ctx),
// Last shutdown the wrapped exporter itself.
be.ShutdownFunc.Shutdown(ctx))
}

func (be *baseExporter) setOnTemporaryFailure(onTemporaryFailure onRequestHandlingFinishedFunc) {
Expand Down
6 changes: 2 additions & 4 deletions exporter/exporterhelper/internal/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/exporter/exportertest"
)

func newNopQueueSettings(callback func(item Request)) QueueSettings {
return QueueSettings{
CreateSettings: exportertest.NewNopCreateSettings(),
DataType: component.DataTypeMetrics,
Callback: callback,
DataType: component.DataTypeMetrics,
Callback: callback,
}
}

Expand Down
11 changes: 7 additions & 4 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/extension/experimental/storage"
)

Expand All @@ -29,6 +30,7 @@ var (
type persistentQueue struct {
stopWG sync.WaitGroup
stopChan chan struct{}
set exporter.CreateSettings
storageID component.ID
storage *persistentContiguousStorage
capacity uint64
Expand All @@ -45,10 +47,11 @@ func buildPersistentStorageName(name string, signal component.DataType) string {

// NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage
func NewPersistentQueue(capacity int, numConsumers int, storageID component.ID, marshaler RequestMarshaler,
unmarshaler RequestUnmarshaler) Queue {
unmarshaler RequestUnmarshaler, set exporter.CreateSettings) Queue {
return &persistentQueue{
capacity: uint64(capacity),
numConsumers: numConsumers,
set: set,
storageID: storageID,
marshaler: marshaler,
unmarshaler: unmarshaler,
Expand All @@ -58,12 +61,12 @@ func NewPersistentQueue(capacity int, numConsumers int, storageID component.ID,

// Start starts the persistentQueue with the given number of consumers.
func (pq *persistentQueue) Start(ctx context.Context, host component.Host, set QueueSettings) error {
storageClient, err := toStorageClient(ctx, pq.storageID, host, set.ID, set.DataType)
storageClient, err := toStorageClient(ctx, pq.storageID, host, pq.set.ID, set.DataType)
if err != nil {
return err
}
storageName := buildPersistentStorageName(set.ID.Name(), set.DataType)
pq.storage = newPersistentContiguousStorage(ctx, storageName, storageClient, set.Logger, pq.capacity, pq.marshaler, pq.unmarshaler)
storageName := buildPersistentStorageName(pq.set.ID.Name(), set.DataType)
pq.storage = newPersistentContiguousStorage(ctx, storageName, storageClient, pq.set.Logger, pq.capacity, pq.marshaler, pq.unmarshaler)
for i := 0; i < pq.numConsumers; i++ {
pq.stopWG.Add(1)
go func() {
Expand Down
7 changes: 4 additions & 3 deletions exporter/exporterhelper/internal/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/extension/experimental/storage"
"go.opentelemetry.io/collector/extension/extensiontest"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand All @@ -34,7 +35,7 @@ func (nh *mockHost) GetExtensions() map[component.ID]component.Component {
// createTestQueue creates and starts a fake queue with the given capacity and number of consumers.
func createTestQueue(t *testing.T, capacity, numConsumers int, callback func(item Request)) Queue {
pq := NewPersistentQueue(capacity, numConsumers, component.ID{}, newFakeTracesRequestMarshalerFunc(),
newFakeTracesRequestUnmarshalerFunc())
newFakeTracesRequestUnmarshalerFunc(), exportertest.NewNopCreateSettings())
host := &mockHost{ext: map[component.ID]component.Component{
{}: NewMockStorageExtension(nil),
}}
Expand All @@ -45,7 +46,7 @@ func createTestQueue(t *testing.T, capacity, numConsumers int, callback func(ite

func TestPersistentQueue_Capacity(t *testing.T) {
pq := NewPersistentQueue(5, 1, component.ID{}, newFakeTracesRequestMarshalerFunc(),
newFakeTracesRequestUnmarshalerFunc())
newFakeTracesRequestUnmarshalerFunc(), exportertest.NewNopCreateSettings())
host := &mockHost{ext: map[component.ID]component.Component{
{}: NewMockStorageExtension(nil),
}}
Expand Down Expand Up @@ -280,7 +281,7 @@ func TestInvalidStorageExtensionType(t *testing.T) {

func TestPersistentQueue_StopAfterBadStart(t *testing.T) {
pq := NewPersistentQueue(1, 1, component.ID{}, newFakeTracesRequestMarshalerFunc(),
newFakeTracesRequestUnmarshalerFunc())
newFakeTracesRequestUnmarshalerFunc(), exportertest.NewNopCreateSettings())
// verify that stopping a un-start/started w/error queue does not panic
assert.NoError(t, pq.Shutdown(context.Background()))
}
2 changes: 0 additions & 2 deletions exporter/exporterhelper/internal/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@ import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
)

type QueueSettings struct {
exporter.CreateSettings
DataType component.DataType
Callback func(item Request)
}
Expand Down
49 changes: 25 additions & 24 deletions exporter/exporterhelper/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,21 @@ type queueSender struct {
queue internal.Queue
traceAttribute attribute.KeyValue
logger *zap.Logger
meter otelmetric.Meter
requeuingEnabled bool

metricCapacity otelmetric.Int64ObservableGauge
metricSize otelmetric.Int64ObservableGauge
}

func newQueueSender(id component.ID, signal component.DataType, queue internal.Queue, logger *zap.Logger) *queueSender {
func newQueueSender(set exporter.CreateSettings, signal component.DataType, queue internal.Queue) *queueSender {
return &queueSender{
fullName: id.String(),
fullName: set.ID.String(),
signal: signal,
queue: queue,
traceAttribute: attribute.String(obsmetrics.ExporterKey, id.String()),
logger: logger,
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
logger: set.TelemetrySettings.Logger,
meter: set.TelemetrySettings.MeterProvider.Meter(scopeName),
// TODO: this can be further exposed as a config param rather than relying on a type of queue
requeuingEnabled: queue != nil && queue.IsPersistent(),
}
Expand Down Expand Up @@ -122,15 +124,14 @@ func (qs *queueSender) onTemporaryFailure(logger *zap.Logger, req internal.Reque
return err
}

// start is invoked during service startup.
func (qs *queueSender) start(ctx context.Context, host component.Host, set exporter.CreateSettings) error {
// Start is invoked during service startup.
func (qs *queueSender) Start(ctx context.Context, host component.Host) error {
if qs.queue == nil {
return nil
}

err := qs.queue.Start(ctx, host, internal.QueueSettings{
CreateSettings: set,
DataType: qs.signal,
DataType: qs.signal,
Callback: func(item internal.Request) {
_ = qs.nextSender.send(item)
item.OnProcessingFinished()
Expand All @@ -141,17 +142,17 @@ func (qs *queueSender) start(ctx context.Context, host component.Host, set expor
}

if obsreportconfig.UseOtelForInternalMetricsfeatureGate.IsEnabled() {
return qs.recordWithOtel(set.MeterProvider.Meter(scopeName))
return qs.recordWithOtel()
}
return qs.recordWithOC()
}

func (qs *queueSender) recordWithOtel(meter otelmetric.Meter) error {
func (qs *queueSender) recordWithOtel() error {
var err, errs error

attrs := otelmetric.WithAttributeSet(attribute.NewSet(attribute.String(obsmetrics.ExporterKey, qs.fullName)))

qs.metricSize, err = meter.Int64ObservableGauge(
qs.metricSize, err = qs.meter.Int64ObservableGauge(
obsmetrics.ExporterKey+"/queue_size",
otelmetric.WithDescription("Current size of the retry queue (in batches)"),
otelmetric.WithUnit("1"),
Expand All @@ -162,7 +163,7 @@ func (qs *queueSender) recordWithOtel(meter otelmetric.Meter) error {
)
errs = multierr.Append(errs, err)

qs.metricCapacity, err = meter.Int64ObservableGauge(
qs.metricCapacity, err = qs.meter.Int64ObservableGauge(
obsmetrics.ExporterKey+"/queue_capacity",
otelmetric.WithDescription("Fixed capacity of the retry queue (in batches)"),
otelmetric.WithUnit("1"),
Expand Down Expand Up @@ -193,19 +194,19 @@ func (qs *queueSender) recordWithOC() error {
return nil
}

// shutdown is invoked during service shutdown.
func (qs *queueSender) shutdown(ctx context.Context) error {
if qs.queue != nil {
// Cleanup queue metrics reporting
_ = globalInstruments.queueSize.UpsertEntry(func() int64 {
return int64(0)
}, metricdata.NewLabelValue(qs.fullName))

// Stop the queued sender, this will drain the queue and will call the retry (which is stopped) that will only
// try once every request.
return qs.queue.Shutdown(ctx)
// Shutdown is invoked during service shutdown.
func (qs *queueSender) Shutdown(ctx context.Context) error {
if qs.queue == nil {
return nil
}
return nil
// Cleanup queue metrics reporting
_ = globalInstruments.queueSize.UpsertEntry(func() int64 {
return int64(0)
}, metricdata.NewLabelValue(qs.fullName))

// Stop the queued sender, this will drain the queue and will call the retry (which is stopped) that will only
// try once every request.
return qs.queue.Shutdown(ctx)
}

// send implements the requestSender interface
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/retry_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func newRetrySender(id component.ID, rCfg RetrySettings, logger *zap.Logger, onT
}
}

func (rs *retrySender) shutdown(context.Context) error {
func (rs *retrySender) Shutdown(context.Context) error {
close(rs.stopCh)
return nil
}
Expand Down

0 comments on commit 23500dd

Please sign in to comment.