Skip to content

Commit

Permalink
[chore] [exporterhelper] Remove retry sender -> queue sender callback (
Browse files Browse the repository at this point in the history
…#8985)

Use returned error instead to simplify the senders feedback loop. This
change preserves the behavior. Re-enqueueing of the temporary failures
depends on the enabled retry sender. This will be changed in the next
step when re-queueing becomes a configurable option
  • Loading branch information
dmitryax authored Nov 27, 2023
1 parent c0deae5 commit 8cec790
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 47 deletions.
25 changes: 11 additions & 14 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func WithRetry(config RetrySettings) Option {
}
return
}
o.retrySender = newRetrySender(config, o.set, o.onTemporaryFailure)
o.retrySender = newRetrySender(config, o.set)
}
}

Expand All @@ -110,9 +110,7 @@ func WithQueue(config QueueSettings) Option {
}
return
}
qs := newQueueSender(config, o.set, o.signal, o.marshaler, o.unmarshaler)
o.queueSender = qs
o.setOnTemporaryFailure(qs.onTemporaryFailure)
o.queueSender = newQueueSender(config, o.set, o.signal, o.marshaler, o.unmarshaler)
}
}

Expand Down Expand Up @@ -146,9 +144,6 @@ type baseExporter struct {
retrySender requestSender
timeoutSender *timeoutSender // timeoutSender is always initialized.

// onTemporaryFailure is a function that is called when the retrySender is unable to send data to the next consumer.
onTemporaryFailure onRequestHandlingFinishedFunc

consumerOptions []consumer.Option
}

Expand Down Expand Up @@ -181,6 +176,15 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req
}
be.connectSenders()

// If retry sender is disabled then disable requeuing in the queue sender.
// TODO: Make re-enqueuing configurable on queue sender instead of relying on retry sender.
if qs, ok := be.queueSender.(*queueSender); ok {
// if it's not retrySender, then it is disabled.
if _, ok = be.retrySender.(*retrySender); !ok {
qs.requeuingEnabled = false
}
}

return be, nil
}

Expand Down Expand Up @@ -215,10 +219,3 @@ func (be *baseExporter) Shutdown(ctx context.Context) error {
// Last shutdown the wrapped exporter itself.
be.ShutdownFunc.Shutdown(ctx))
}

func (be *baseExporter) setOnTemporaryFailure(onTemporaryFailure onRequestHandlingFinishedFunc) {
be.onTemporaryFailure = onTemporaryFailure
if rs, ok := be.retrySender.(*retrySender); ok {
rs.onTemporaryFailure = onTemporaryFailure
}
}
27 changes: 16 additions & 11 deletions exporter/exporterhelper/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/internal/obsreportconfig"
Expand Down Expand Up @@ -114,37 +115,41 @@ func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal co
}
}

func (qs *queueSender) onTemporaryFailure(ctx context.Context, req Request, err error, logger *zap.Logger) error {
// consume is the function that is executed by the queue consumers to send the data to the next consumerSender.
func (qs *queueSender) consume(ctx context.Context, req Request) {
err := qs.nextSender.send(ctx, req)

// Nothing to do if the error is nil or permanent. Permanent errors are already logged by retrySender.
if err == nil || consumererror.IsPermanent(err) {
return
}

if !qs.requeuingEnabled {
logger.Error(
qs.logger.Error(
"Exporting failed. No more retries left. Dropping data.",
zap.Error(err),
zap.Int("dropped_items", req.ItemsCount()),
)
return err
return
}

if qs.queue.Offer(ctx, req) == nil {
logger.Error(
qs.logger.Error(
"Exporting failed. Putting back to the end of the queue.",
zap.Error(err),
)
} else {
logger.Error(
qs.logger.Error(
"Exporting failed. Queue did not accept requeuing request. Dropping data.",
zap.Error(err),
zap.Int("dropped_items", req.ItemsCount()),
)
}
return err
}

// Start is invoked during service startup.
func (qs *queueSender) Start(ctx context.Context, host component.Host) error {
qs.consumers = internal.NewQueueConsumers(qs.queue, qs.numConsumers, func(ctx context.Context, req Request) {
// TODO: Update item.OnProcessingFinished to accept error and remove the retry->queue sender callback.
_ = qs.nextSender.send(ctx, req)
})
qs.consumers = internal.NewQueueConsumers(qs.queue, qs.numConsumers, qs.consume)
if err := qs.consumers.Start(ctx, host); err != nil {
return err
}
Expand Down Expand Up @@ -214,7 +219,7 @@ func (qs *queueSender) Shutdown(ctx context.Context) error {
return qs.consumers.Shutdown(ctx)
}

// send implements the requestSender interface
// send implements the requestSender interface. It puts the request in the queue.
func (qs *queueSender) send(ctx context.Context, req Request) error {
// Prevent cancellation and deadline to propagate to the context stored in the queue.
// The grpc/http based receivers will cancel the request context after this function returns.
Expand Down
33 changes: 33 additions & 0 deletions exporter/exporterhelper/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,39 @@ func TestQueuedRetry_RequeuingEnabled(t *testing.T) {
ocs.checkDroppedItemsCount(t, 1) // not actually dropped, but ocs counts each failed send here
}

// disabling retry sender should disable requeuing.
func TestQueuedRetry_RequeuingDisabled(t *testing.T) {
mockR := newMockRequest(2, errors.New("transient error"))

// use persistent storage as it expected to be used with requeuing unless the retry sender is disabled
qCfg := NewDefaultQueueSettings()
storageID := component.NewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID // enable persistence
rCfg := NewDefaultRetrySettings()
rCfg.Enabled = false

be, err := newBaseExporter(defaultSettings, "", false, mockRequestMarshaler, mockRequestUnmarshaler(mockR), newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
ocs := be.obsrepSender.(*observabilityConsumerSender)

var extensions = map[component.ID]component.Component{
storageID: internal.NewMockStorageExtension(nil),
}
host := &mockHost{ext: extensions}
require.NoError(t, be.Start(context.Background(), host))

ocs.run(func() {
// This is asynchronous so it should just enqueue, no errors expected.
require.NoError(t, be.send(context.Background(), mockR))
})
ocs.awaitAsyncProcessing()

// one failed request, no retries, two items dropped.
mockR.checkNumRequests(t, 1)
ocs.checkSendItemsCount(t, 0)
ocs.checkDroppedItemsCount(t, 2)
}

// if requeueing is enabled, but the queue is full, we get an error
func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) {
qCfg := NewDefaultQueueSettings()
Expand Down
34 changes: 12 additions & 22 deletions exporter/exporterhelper/retry_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,29 +73,20 @@ func NewThrottleRetry(err error, delay time.Duration) error {
}
}

type onRequestHandlingFinishedFunc func(context.Context, Request, error, *zap.Logger) error

type retrySender struct {
baseRequestSender
traceAttribute attribute.KeyValue
cfg RetrySettings
stopCh chan struct{}
logger *zap.Logger
onTemporaryFailure onRequestHandlingFinishedFunc
traceAttribute attribute.KeyValue
cfg RetrySettings
stopCh chan struct{}
logger *zap.Logger
}

func newRetrySender(config RetrySettings, set exporter.CreateSettings, onTemporaryFailure onRequestHandlingFinishedFunc) *retrySender {
if onTemporaryFailure == nil {
onTemporaryFailure = func(_ context.Context, _ Request, err error, _ *zap.Logger) error {
return err
}
}
func newRetrySender(config RetrySettings, set exporter.CreateSettings) *retrySender {
return &retrySender{
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
cfg: config,
stopCh: make(chan struct{}),
logger: set.Logger,
onTemporaryFailure: onTemporaryFailure,
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
cfg: config,
stopCh: make(chan struct{}),
logger: set.Logger,
}
}

Expand Down Expand Up @@ -126,6 +117,7 @@ func (rs *retrySender) send(ctx context.Context, req Request) error {
trace.WithAttributes(rs.traceAttribute, attribute.Int64("retry_num", retryNum)))

err := rs.nextSender.send(ctx, req)
rs.logger.Info("Exporting finished.", zap.Error(err))
if err == nil {
return nil
}
Expand All @@ -148,9 +140,7 @@ func (rs *retrySender) send(ctx context.Context, req Request) error {

backoffDelay := expBackoff.NextBackOff()
if backoffDelay == backoff.Stop {
// throw away the batch
err = fmt.Errorf("max elapsed time expired %w", err)
return rs.onTemporaryFailure(ctx, req, err, rs.logger)
return fmt.Errorf("max elapsed time expired %w", err)
}

throttleErr := throttleRetry{}
Expand Down Expand Up @@ -178,7 +168,7 @@ func (rs *retrySender) send(ctx context.Context, req Request) error {
case <-ctx.Done():
return fmt.Errorf("request is cancelled or timed out %w", err)
case <-rs.stopCh:
return rs.onTemporaryFailure(ctx, req, fmt.Errorf("interrupted due to shutdown %w", err), rs.logger)
return fmt.Errorf("interrupted due to shutdown %w", err)
case <-time.After(backoffDelay):
}
}
Expand Down

0 comments on commit 8cec790

Please sign in to comment.