Skip to content

Commit

Permalink
Change Queue interface to return a callback instead of an index (#12230)
Browse files Browse the repository at this point in the history
This change will allow to simplify significantly the implementation for
when queue is disabled and caller must block until the batch is
constructed and processed downstream

Updates
#8122

It is ok to have this as a breaking change since Queue is still
experimental.

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored Feb 1, 2025
1 parent 0a40b4e commit 4ee2b50
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 63 deletions.
25 changes: 25 additions & 0 deletions .chloggen/done-callback.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterqueue

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Change Queue interface to return a callback instead of an index

# One or more tracking issues or pull requests related to the change
issues: [8122]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
10 changes: 4 additions & 6 deletions exporter/exporterqueue/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"go.opentelemetry.io/collector/component"
)

var noopDone DoneCallback = func(error) {}

// boundedMemoryQueue implements a producer-consumer exchange similar to a ring buffer queue,
// where the queue is bounded and if it fills up due to slow consumers, the new items written by
// the producer are dropped.
Expand All @@ -34,11 +36,7 @@ func newBoundedMemoryQueue[T any](set memoryQueueSettings[T]) Queue[T] {
}
}

func (q *boundedMemoryQueue[T]) Read(context.Context) (uint64, context.Context, T, bool) {
func (q *boundedMemoryQueue[T]) Read(context.Context) (context.Context, T, DoneCallback, bool) {
ctx, req, ok := q.sizedQueue.pop()
return 0, ctx, req, ok
return ctx, req, noopDone, ok
}

// OnProcessingFinished should be called to remove the item of the given index from the queue once processing is finished.
// For in memory queue, this function is noop.
func (q *boundedMemoryQueue[T]) OnProcessingFinished(uint64, error) {}
8 changes: 4 additions & 4 deletions exporter/exporterqueue/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,11 @@ func TestZeroSizeNoConsumers(t *testing.T) {
}

func consume[T any](q Queue[T], consumeFunc func(context.Context, T) error) bool {
index, ctx, req, ok := q.Read(context.Background())
ctx, req, done, ok := q.Read(context.Background())
if !ok {
return false
}
q.OnProcessingFinished(index, consumeFunc(ctx, req))
done(consumeFunc(ctx, req))
return true
}

Expand All @@ -203,11 +203,11 @@ func newAsyncConsumer[T any](q Queue[T], numConsumers int, consumeFunc func(cont
go func() {
defer ac.stopWG.Done()
for {
index, ctx, req, ok := q.Read(context.Background())
ctx, req, done, ok := q.Read(context.Background())
if !ok {
return
}
q.OnProcessingFinished(index, consumeFunc(ctx, req))
done(consumeFunc(ctx, req))
}
}()
}
Expand Down
12 changes: 6 additions & 6 deletions exporter/exporterqueue/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,14 +273,14 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
return nil
}

func (pq *persistentQueue[T]) Read(ctx context.Context) (uint64, context.Context, T, bool) {
func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, DoneCallback, bool) {
pq.mu.Lock()
defer pq.mu.Unlock()

for {
if pq.stopped {
var req T
return 0, context.Background(), req, false
return context.Background(), req, nil, false
}

// Read until either a successful retrieved element or no more elements in the storage.
Expand All @@ -301,7 +301,7 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (uint64, context.Context
}
pq.hasMoreSpace.Signal()

return index, context.Background(), req, true
return context.Background(), req, func(processErr error) { pq.onDone(index, processErr) }, true
}
}

Expand All @@ -312,7 +312,7 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (uint64, context.Context
}

// getNextItem pulls the next available item from the persistent storage along with its index. Once processing is
// finished, the index should be called with OnProcessingFinished to clean up the storage. If no new item is available,
// finished, the index should be called with onDone to clean up the storage. If no new item is available,
// returns false.
func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool) {
index := pq.readIndex
Expand Down Expand Up @@ -347,8 +347,8 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool)
return index, request, true
}

// OnProcessingFinished should be called to remove the item of the given index from the queue once processing is finished.
func (pq *persistentQueue[T]) OnProcessingFinished(index uint64, consumeErr error) {
// onDone should be called to remove the item of the given index from the queue once processing is finished.
func (pq *persistentQueue[T]) onDone(index uint64, consumeErr error) {
// Delete the item from the persistent storage after it was processed.
pq.mu.Lock()
// Always unref client even if the consumer is shutdown because we always ref it for every valid request.
Expand Down
10 changes: 5 additions & 5 deletions exporter/exporterqueue/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,19 +659,19 @@ func TestPersistentQueue_CurrentlyProcessedItems(t *testing.T) {
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{})

// Takes index 0 in process.
_, _, readReq, found := ps.Read(context.Background())
_, readReq, _, found := ps.Read(context.Background())
require.True(t, found)
assert.Equal(t, req, readReq)
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0})

// This takes item 1 to process.
secondIndex, _, secondReadReq, found := ps.Read(context.Background())
_, secondReadReq, secondDone, found := ps.Read(context.Background())
require.True(t, found)
assert.Equal(t, req, secondReadReq)
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0, 1})

// Lets mark item 1 as finished, it will remove it from the currently dispatched items list.
ps.OnProcessingFinished(secondIndex, nil)
secondDone(nil)
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0})

// Reload the storage. Since items 0 was not finished, this should be re-enqueued at the end.
Expand Down Expand Up @@ -910,12 +910,12 @@ func TestPersistentQueue_ShutdownWhileConsuming(t *testing.T) {

require.NoError(t, ps.Offer(context.Background(), uint64(50)))

index, _, _, ok := ps.Read(context.Background())
_, _, done, ok := ps.Read(context.Background())
require.True(t, ok)
assert.False(t, ps.client.(*storagetest.MockStorageClient).IsClosed())
require.NoError(t, ps.Shutdown(context.Background()))
assert.False(t, ps.client.(*storagetest.MockStorageClient).IsClosed())
ps.OnProcessingFinished(index, nil)
done(nil)
assert.True(t, ps.client.(*storagetest.MockStorageClient).IsClosed())
}

Expand Down
19 changes: 12 additions & 7 deletions exporter/exporterqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,18 @@ import (
"go.opentelemetry.io/collector/pipeline"
)

// ErrQueueIsFull is the error returned when an item is offered to the Queue and the queue is full.
// ErrQueueIsFull is the error returned when an item is offered to the Queue and the queue is full and setup to
// not block.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
var ErrQueueIsFull = errors.New("sending queue is full")

// DoneCallback represents the callback that will be called when the read request is completely processed by the
// downstream components.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type DoneCallback func(processErr error)

// Queue defines a producer-consumer exchange which can be backed by e.g. the memory-based ring buffer queue
// (boundedMemoryQueue) or via a disk-based queue (persistentQueue)
// Experimental: This API is at the early stage of development and may change without backward compatibility
Expand All @@ -31,13 +38,11 @@ type Queue[T any] interface {
Size() int64
// Capacity returns the capacity of the queue.
Capacity() int64
// Read pulls the next available item from the queue along with its index. Once processing is
// finished, the index should be called with OnProcessingFinished to clean up the storage.
// Read pulls the next available item from the queue along with its done callback. Once processing is
// finished, the done callback must be called to clean up the storage.
// The function blocks until an item is available or if the queue is stopped.
// Returns false if reading failed or if the queue is stopped.
Read(context.Context) (uint64, context.Context, T, bool)
// OnProcessingFinished should be called to remove the item of the given index from the queue once processing is finished.
OnProcessingFinished(index uint64, consumeErr error)
// If the queue is stopped returns false, otherwise true.
Read(context.Context) (context.Context, T, DoneCallback, bool)
}

// Settings defines settings for creating a queue.
Expand Down
14 changes: 7 additions & 7 deletions exporter/internal/queue/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
)

type batch struct {
ctx context.Context
req internal.Request
idxList []uint64
ctx context.Context
req internal.Request
dones []exporterqueue.DoneCallback
}

// Batcher is in charge of reading items from the queue and send them out asynchronously.
Expand Down Expand Up @@ -68,16 +68,16 @@ func newBaseBatcher(batchCfg exporterbatcher.Config,
}

// flush starts a goroutine that calls exportFunc. It blocks until a worker is available if necessary.
func (qb *BaseBatcher) flush(batchToFlush batch) {
func (qb *BaseBatcher) flush(ctx context.Context, req internal.Request, dones []exporterqueue.DoneCallback) {
qb.stopWG.Add(1)
if qb.workerPool != nil {
<-qb.workerPool
}
go func() {
defer qb.stopWG.Done()
err := qb.exportFunc(batchToFlush.ctx, batchToFlush.req)
for _, idx := range batchToFlush.idxList {
qb.queue.OnProcessingFinished(idx, err)
err := qb.exportFunc(ctx, req)
for _, done := range dones {
done(err)
}
if qb.workerPool != nil {
qb.workerPool <- true
Expand Down
5 changes: 2 additions & 3 deletions exporter/internal/queue/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,11 @@ func (qc *Consumers[T]) Start(_ context.Context, _ component.Host) error {
startWG.Done()
defer qc.stopWG.Done()
for {
index, ctx, req, ok := qc.queue.Read(context.Background())
ctx, req, done, ok := qc.queue.Read(context.Background())
if !ok {
return
}
consumeErr := qc.consumeFunc(ctx, req)
qc.queue.OnProcessingFinished(index, consumeErr)
done(qc.consumeFunc(ctx, req))
}
}()
}
Expand Down
35 changes: 16 additions & 19 deletions exporter/internal/queue/default_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/internal"
)

Expand All @@ -35,7 +36,7 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() {
defer qb.stopWG.Done()
for {
// Read() blocks until the queue is non-empty or until the queue is stopped.
idx, ctx, req, ok := qb.queue.Read(context.Background())
ctx, req, done, ok := qb.queue.Read(context.Background())
if !ok {
qb.shutdownCh <- true
return
Expand All @@ -54,7 +55,7 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() {
}

if mergeSplitErr != nil || reqList == nil {
qb.queue.OnProcessingFinished(idx, mergeSplitErr)
done(mergeSplitErr)
qb.currentBatchMu.Unlock()
continue
}
Expand All @@ -64,42 +65,38 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() {
qb.currentBatch = nil
qb.currentBatchMu.Unlock()
for i := 0; i < len(reqList); i++ {
qb.flush(batch{
req: reqList[i],
ctx: ctx,
idxList: []uint64{idx},
})
qb.flush(ctx, reqList[i], []exporterqueue.DoneCallback{done})
// TODO: handle partial failure
}
qb.resetTimer()
} else {
qb.currentBatch = &batch{
req: reqList[0],
ctx: ctx,
idxList: []uint64{idx},
req: reqList[0],
ctx: ctx,
dones: []exporterqueue.DoneCallback{done},
}
qb.currentBatchMu.Unlock()
}
} else {
if qb.currentBatch == nil || qb.currentBatch.req == nil {
qb.resetTimer()
qb.currentBatch = &batch{
req: req,
ctx: ctx,
idxList: []uint64{idx},
req: req,
ctx: ctx,
dones: []exporterqueue.DoneCallback{done},
}
} else {
// TODO: consolidate implementation for the cases where MaxSizeConfig is specified and the case where it is not specified
mergedReq, mergeErr := qb.currentBatch.req.MergeSplit(qb.currentBatch.ctx, qb.batchCfg.MaxSizeConfig, req)
if mergeErr != nil {
qb.queue.OnProcessingFinished(idx, mergeErr)
done(mergeErr)
qb.currentBatchMu.Unlock()
continue
}
qb.currentBatch = &batch{
req: mergedReq[0],
ctx: qb.currentBatch.ctx,
idxList: append(qb.currentBatch.idxList, idx),
req: mergedReq[0],
ctx: qb.currentBatch.ctx,
dones: append(qb.currentBatch.dones, done),
}
}

Expand All @@ -109,7 +106,7 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() {
qb.currentBatchMu.Unlock()

// flush() blocks until successfully started a goroutine for flushing.
qb.flush(batchToFlush)
qb.flush(batchToFlush.ctx, batchToFlush.req, batchToFlush.dones)
qb.resetTimer()
} else {
qb.currentBatchMu.Unlock()
Expand Down Expand Up @@ -168,7 +165,7 @@ func (qb *DefaultBatcher) flushCurrentBatchIfNecessary() {
qb.currentBatchMu.Unlock()

// flush() blocks until successfully started a goroutine for flushing.
qb.flush(batchToFlush)
qb.flush(batchToFlush.ctx, batchToFlush.req, batchToFlush.dones)
qb.resetTimer()
}

Expand Down
9 changes: 3 additions & 6 deletions exporter/internal/queue/disabled_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exporterqueue"
)

// DisabledBatcher is a special-case of Batcher that has no size limit for sending. Any items read from the queue will
Expand All @@ -29,15 +30,11 @@ func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error {
go func() {
defer qb.stopWG.Done()
for {
idx, _, req, ok := qb.queue.Read(context.Background())
_, req, done, ok := qb.queue.Read(context.Background())
if !ok {
return
}
qb.flush(batch{
req: req,
ctx: context.Background(),
idxList: []uint64{idx},
})
qb.flush(context.Background(), req, []exporterqueue.DoneCallback{done})
}
}()
return nil
Expand Down

0 comments on commit 4ee2b50

Please sign in to comment.