diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index 41e885af0cf..f6369b778ca 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -133,7 +133,7 @@ func (qs *queueSender) consume(ctx context.Context, req Request) { return } - if qs.queue.Offer(ctx, req) == nil { + if qs.queue.Offer(ctx, extractPartialRequest(req, err)) == nil { qs.logger.Error( "Exporting failed. Putting back to the end of the queue.", zap.Error(err), diff --git a/exporter/exporterhelper/queue_sender_test.go b/exporter/exporterhelper/queue_sender_test.go index 7893b427019..0b76915d20d 100644 --- a/exporter/exporterhelper/queue_sender_test.go +++ b/exporter/exporterhelper/queue_sender_test.go @@ -16,13 +16,11 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/internal/obsreportconfig" - "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/obsreport/obsreporttest" ) @@ -240,8 +238,7 @@ func TestQueuedRetry_RequeuingEnabled(t *testing.T) { assert.NoError(t, be.Shutdown(context.Background())) }) - traceErr := consumererror.NewTraces(errors.New("some error"), testdata.GenerateTraces(1)) - mockR := newMockRequest(1, traceErr) + mockR := newMockRequest(4, errors.New("transient error")) ocs.run(func() { ocs.waitGroup.Add(1) // necessary because we'll call send() again after requeueing // This is asynchronous so it should just enqueue, no errors expected. @@ -251,8 +248,9 @@ func TestQueuedRetry_RequeuingEnabled(t *testing.T) { // In the newMockConcurrentExporter we count requests and items even for failed requests mockR.checkNumRequests(t, 2) + // ensure that only 1 item was sent which correspond to items count in the error returned by mockRequest.OnError() ocs.checkSendItemsCount(t, 1) - ocs.checkDroppedItemsCount(t, 1) // not actually dropped, but ocs counts each failed send here + ocs.checkDroppedItemsCount(t, 4) // not actually dropped, but ocs counts each failed send here } // disabling retry sender should disable requeuing. diff --git a/exporter/exporterhelper/request.go b/exporter/exporterhelper/request.go index 2c074f565ff..c29da3a10a3 100644 --- a/exporter/exporterhelper/request.go +++ b/exporter/exporterhelper/request.go @@ -41,3 +41,12 @@ type RequestMarshaler func(req Request) ([]byte, error) // This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. type RequestUnmarshaler func(data []byte) (Request, error) + +// extractPartialRequest returns a new Request that may contain the items left to be sent +// if only some items failed to process and can be retried. Otherwise, it returns the original Request. +func extractPartialRequest(req Request, err error) Request { + if errReq, ok := req.(RequestErrorHandler); ok { + return errReq.OnError(err) + } + return req +} diff --git a/exporter/exporterhelper/retry_sender.go b/exporter/exporterhelper/retry_sender.go index 3cb3f775a91..e1ffb7b9388 100644 --- a/exporter/exporterhelper/retry_sender.go +++ b/exporter/exporterhelper/retry_sender.go @@ -132,11 +132,7 @@ func (rs *retrySender) send(ctx context.Context, req Request) error { return err } - // Give the request a chance to extract signal data to retry if only some data - // failed to process. - if errReq, ok := req.(RequestErrorHandler); ok { - req = errReq.OnError(err) - } + req = extractPartialRequest(req, err) backoffDelay := expBackoff.NextBackOff() if backoffDelay == backoff.Stop {