Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use standard component Start/Shutdown func in exporterhelper #8861

Merged
merged 1 commit into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 14 additions & 25 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,19 @@ import (

// requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs).
type requestSender interface {
start(ctx context.Context, host component.Host, set exporter.CreateSettings) error
shutdown(ctx context.Context) error
component.Component
send(req internal.Request) error
setNextSender(nextSender requestSender)
}

type baseRequestSender struct {
component.StartFunc
component.ShutdownFunc
nextSender requestSender
}

var _ requestSender = (*baseRequestSender)(nil)

func (b *baseRequestSender) start(context.Context, component.Host, exporter.CreateSettings) error {
return nil
}

func (b *baseRequestSender) shutdown(context.Context) error {
return nil
}

func (b *baseRequestSender) send(req internal.Request) error {
return b.nextSender.send(req)
}
Expand All @@ -53,10 +46,7 @@ type errorLoggingRequestSender struct {
func (l *errorLoggingRequestSender) send(req internal.Request) error {
err := l.baseRequestSender.send(req)
if err != nil {
l.logger.Error(
"Exporting failed",
zap.Error(err),
)
l.logger.Error("Exporting failed", zap.Error(err))
}
return err
}
Expand Down Expand Up @@ -135,10 +125,10 @@ func WithQueue(config QueueSettings) Option {
if config.StorageID == nil {
queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers)
} else {
queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler)
queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler, o.set)
}
}
qs := newQueueSender(o.set.ID, o.signal, queue, o.set.Logger)
qs := newQueueSender(o.set, o.signal, queue)
o.queueSender = qs
o.setOnTemporaryFailure(qs.onTemporaryFailure)
}
Expand Down Expand Up @@ -231,18 +221,17 @@ func (be *baseExporter) Start(ctx context.Context, host component.Host) error {
}

// If no error then start the queueSender.
return be.queueSender.start(ctx, host, be.set)
return be.queueSender.Start(ctx, host)
}

func (be *baseExporter) Shutdown(ctx context.Context) error {
// First shutdown the retry sender, so it can push any pending requests to back the queue.
err := be.retrySender.shutdown(ctx)

// Then shutdown the queue sender.
err = multierr.Append(err, be.queueSender.shutdown(ctx))

// Last shutdown the wrapped exporter itself.
return multierr.Append(err, be.ShutdownFunc.Shutdown(ctx))
return multierr.Combine(
// First shutdown the retry sender, so it can push any pending requests to back the queue.
be.retrySender.Shutdown(ctx),
// Then shutdown the queue sender.
be.queueSender.Shutdown(ctx),
// Last shutdown the wrapped exporter itself.
be.ShutdownFunc.Shutdown(ctx))
}

func (be *baseExporter) setOnTemporaryFailure(onTemporaryFailure onRequestHandlingFinishedFunc) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/exporter/exportertest"
)

func newNopQueueSettings(callback func(item Request)) QueueSettings {
return QueueSettings{
CreateSettings: exportertest.NewNopCreateSettings(),
DataType: component.DataTypeMetrics,
Callback: callback,
DataType: component.DataTypeMetrics,
Callback: callback,
}
}

Expand Down
11 changes: 7 additions & 4 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/extension/experimental/storage"
)

Expand All @@ -29,6 +30,7 @@ var (
type persistentQueue struct {
stopWG sync.WaitGroup
stopChan chan struct{}
set exporter.CreateSettings
storageID component.ID
storage *persistentContiguousStorage
capacity uint64
Expand All @@ -45,10 +47,11 @@ func buildPersistentStorageName(name string, signal component.DataType) string {

// NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage
func NewPersistentQueue(capacity int, numConsumers int, storageID component.ID, marshaler RequestMarshaler,
unmarshaler RequestUnmarshaler) Queue {
unmarshaler RequestUnmarshaler, set exporter.CreateSettings) Queue {
return &persistentQueue{
capacity: uint64(capacity),
numConsumers: numConsumers,
set: set,
storageID: storageID,
marshaler: marshaler,
unmarshaler: unmarshaler,
Expand All @@ -58,12 +61,12 @@ func NewPersistentQueue(capacity int, numConsumers int, storageID component.ID,

// Start starts the persistentQueue with the given number of consumers.
func (pq *persistentQueue) Start(ctx context.Context, host component.Host, set QueueSettings) error {
storageClient, err := toStorageClient(ctx, pq.storageID, host, set.ID, set.DataType)
storageClient, err := toStorageClient(ctx, pq.storageID, host, pq.set.ID, set.DataType)
if err != nil {
return err
}
storageName := buildPersistentStorageName(set.ID.Name(), set.DataType)
pq.storage = newPersistentContiguousStorage(ctx, storageName, storageClient, set.Logger, pq.capacity, pq.marshaler, pq.unmarshaler)
storageName := buildPersistentStorageName(pq.set.ID.Name(), set.DataType)
pq.storage = newPersistentContiguousStorage(ctx, storageName, storageClient, pq.set.Logger, pq.capacity, pq.marshaler, pq.unmarshaler)
for i := 0; i < pq.numConsumers; i++ {
pq.stopWG.Add(1)
go func() {
Expand Down
7 changes: 4 additions & 3 deletions exporter/exporterhelper/internal/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/extension/experimental/storage"
"go.opentelemetry.io/collector/extension/extensiontest"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand All @@ -34,7 +35,7 @@ func (nh *mockHost) GetExtensions() map[component.ID]component.Component {
// createTestQueue creates and starts a fake queue with the given capacity and number of consumers.
func createTestQueue(t *testing.T, capacity, numConsumers int, callback func(item Request)) Queue {
pq := NewPersistentQueue(capacity, numConsumers, component.ID{}, newFakeTracesRequestMarshalerFunc(),
newFakeTracesRequestUnmarshalerFunc())
newFakeTracesRequestUnmarshalerFunc(), exportertest.NewNopCreateSettings())
host := &mockHost{ext: map[component.ID]component.Component{
{}: NewMockStorageExtension(nil),
}}
Expand All @@ -45,7 +46,7 @@ func createTestQueue(t *testing.T, capacity, numConsumers int, callback func(ite

func TestPersistentQueue_Capacity(t *testing.T) {
pq := NewPersistentQueue(5, 1, component.ID{}, newFakeTracesRequestMarshalerFunc(),
newFakeTracesRequestUnmarshalerFunc())
newFakeTracesRequestUnmarshalerFunc(), exportertest.NewNopCreateSettings())
host := &mockHost{ext: map[component.ID]component.Component{
{}: NewMockStorageExtension(nil),
}}
Expand Down Expand Up @@ -280,7 +281,7 @@ func TestInvalidStorageExtensionType(t *testing.T) {

func TestPersistentQueue_StopAfterBadStart(t *testing.T) {
pq := NewPersistentQueue(1, 1, component.ID{}, newFakeTracesRequestMarshalerFunc(),
newFakeTracesRequestUnmarshalerFunc())
newFakeTracesRequestUnmarshalerFunc(), exportertest.NewNopCreateSettings())
// verify that stopping a un-start/started w/error queue does not panic
assert.NoError(t, pq.Shutdown(context.Background()))
}
2 changes: 0 additions & 2 deletions exporter/exporterhelper/internal/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@ import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
)

type QueueSettings struct {
exporter.CreateSettings
DataType component.DataType
Callback func(item Request)
}
Expand Down
49 changes: 25 additions & 24 deletions exporter/exporterhelper/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,21 @@ type queueSender struct {
queue internal.Queue
traceAttribute attribute.KeyValue
logger *zap.Logger
meter otelmetric.Meter
requeuingEnabled bool

metricCapacity otelmetric.Int64ObservableGauge
metricSize otelmetric.Int64ObservableGauge
}

func newQueueSender(id component.ID, signal component.DataType, queue internal.Queue, logger *zap.Logger) *queueSender {
func newQueueSender(set exporter.CreateSettings, signal component.DataType, queue internal.Queue) *queueSender {
return &queueSender{
fullName: id.String(),
fullName: set.ID.String(),
signal: signal,
queue: queue,
traceAttribute: attribute.String(obsmetrics.ExporterKey, id.String()),
logger: logger,
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
logger: set.TelemetrySettings.Logger,
meter: set.TelemetrySettings.MeterProvider.Meter(scopeName),
// TODO: this can be further exposed as a config param rather than relying on a type of queue
requeuingEnabled: queue != nil && queue.IsPersistent(),
}
Expand Down Expand Up @@ -122,15 +124,14 @@ func (qs *queueSender) onTemporaryFailure(logger *zap.Logger, req internal.Reque
return err
}

// start is invoked during service startup.
func (qs *queueSender) start(ctx context.Context, host component.Host, set exporter.CreateSettings) error {
// Start is invoked during service startup.
func (qs *queueSender) Start(ctx context.Context, host component.Host) error {
if qs.queue == nil {
return nil
}

err := qs.queue.Start(ctx, host, internal.QueueSettings{
CreateSettings: set,
DataType: qs.signal,
DataType: qs.signal,
Callback: func(item internal.Request) {
_ = qs.nextSender.send(item)
item.OnProcessingFinished()
Expand All @@ -141,17 +142,17 @@ func (qs *queueSender) start(ctx context.Context, host component.Host, set expor
}

if obsreportconfig.UseOtelForInternalMetricsfeatureGate.IsEnabled() {
return qs.recordWithOtel(set.MeterProvider.Meter(scopeName))
return qs.recordWithOtel()
}
return qs.recordWithOC()
}

func (qs *queueSender) recordWithOtel(meter otelmetric.Meter) error {
func (qs *queueSender) recordWithOtel() error {
var err, errs error

attrs := otelmetric.WithAttributeSet(attribute.NewSet(attribute.String(obsmetrics.ExporterKey, qs.fullName)))

qs.metricSize, err = meter.Int64ObservableGauge(
qs.metricSize, err = qs.meter.Int64ObservableGauge(
obsmetrics.ExporterKey+"/queue_size",
otelmetric.WithDescription("Current size of the retry queue (in batches)"),
otelmetric.WithUnit("1"),
Expand All @@ -162,7 +163,7 @@ func (qs *queueSender) recordWithOtel(meter otelmetric.Meter) error {
)
errs = multierr.Append(errs, err)

qs.metricCapacity, err = meter.Int64ObservableGauge(
qs.metricCapacity, err = qs.meter.Int64ObservableGauge(
obsmetrics.ExporterKey+"/queue_capacity",
otelmetric.WithDescription("Fixed capacity of the retry queue (in batches)"),
otelmetric.WithUnit("1"),
Expand Down Expand Up @@ -193,19 +194,19 @@ func (qs *queueSender) recordWithOC() error {
return nil
}

// shutdown is invoked during service shutdown.
func (qs *queueSender) shutdown(ctx context.Context) error {
if qs.queue != nil {
// Cleanup queue metrics reporting
_ = globalInstruments.queueSize.UpsertEntry(func() int64 {
return int64(0)
}, metricdata.NewLabelValue(qs.fullName))

// Stop the queued sender, this will drain the queue and will call the retry (which is stopped) that will only
// try once every request.
return qs.queue.Shutdown(ctx)
// Shutdown is invoked during service shutdown.
func (qs *queueSender) Shutdown(ctx context.Context) error {
if qs.queue == nil {
return nil
}
return nil
// Cleanup queue metrics reporting
_ = globalInstruments.queueSize.UpsertEntry(func() int64 {
return int64(0)
}, metricdata.NewLabelValue(qs.fullName))

// Stop the queued sender, this will drain the queue and will call the retry (which is stopped) that will only
// try once every request.
return qs.queue.Shutdown(ctx)
}

// send implements the requestSender interface
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/retry_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func newRetrySender(id component.ID, rCfg RetrySettings, logger *zap.Logger, onT
}
}

func (rs *retrySender) shutdown(context.Context) error {
func (rs *retrySender) Shutdown(context.Context) error {
close(rs.stopCh)
return nil
}
Expand Down
Loading