From e6df9d84d390dca64af38fcf3138f8b58073b692 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Fri, 10 Jan 2025 14:26:19 -0800 Subject: [PATCH] [chore] Cleanup sized channel, no more exceptions for persistent queue (#12069) Signed-off-by: Bogdan Drutu --- .../bounded_memory_queue_test.go | 111 ++++++++++-------- exporter/exporterqueue/sized_channel.go | 49 +++----- exporter/exporterqueue/sized_channel_test.go | 11 +- 3 files changed, 87 insertions(+), 84 deletions(-) diff --git a/exporter/exporterqueue/bounded_memory_queue_test.go b/exporter/exporterqueue/bounded_memory_queue_test.go index 578733bc6eb..f723f8263f8 100644 --- a/exporter/exporterqueue/bounded_memory_queue_test.go +++ b/exporter/exporterqueue/bounded_memory_queue_test.go @@ -99,55 +99,38 @@ func TestShutdownWhileNotEmpty(t *testing.T) { })) } -func Benchmark_QueueUsage_1000_requests(b *testing.B) { - benchmarkQueueUsage(b, &requestSizer[ptrace.Traces]{}, 1000) -} - -func Benchmark_QueueUsage_100000_requests(b *testing.B) { - benchmarkQueueUsage(b, &requestSizer[ptrace.Traces]{}, 100000) -} - -func Benchmark_QueueUsage_10000_items(b *testing.B) { - // each request has 10 items: 1000 requests = 10000 items - benchmarkQueueUsage(b, &itemsSizer{}, 1000) -} - -func Benchmark_QueueUsage_1M_items(b *testing.B) { - // each request has 10 items: 100000 requests = 1M items - benchmarkQueueUsage(b, &itemsSizer{}, 100000) -} - func TestQueueUsage(t *testing.T) { - t.Run("requests_based", func(t *testing.T) { - queueUsage(t, &requestSizer[ptrace.Traces]{}, 10) - }) - t.Run("items_based", func(t *testing.T) { - queueUsage(t, &itemsSizer{}, 10) - }) -} - -func benchmarkQueueUsage(b *testing.B, sizer sizer[ptrace.Traces], requestsCount int) { - b.ReportAllocs() - for i := 0; i < b.N; i++ { - queueUsage(b, sizer, requestsCount) + tests := []struct { + name string + sizer sizer[ptrace.Traces] + }{ + { + name: "requests_based", + sizer: &requestSizer[ptrace.Traces]{}, + }, + { + name: "items_based", + sizer: &itemsSizer{}, + }, } -} - -func queueUsage(tb testing.TB, sizer sizer[ptrace.Traces], requestsCount int) { - q := newBoundedMemoryQueue[ptrace.Traces](memoryQueueSettings[ptrace.Traces]{sizer: sizer, capacity: int64(10 * requestsCount)}) - consumed := &atomic.Int64{} - require.NoError(tb, q.Start(context.Background(), componenttest.NewNopHost())) - ac := newAsyncConsumer(q, 1, func(context.Context, ptrace.Traces) error { - consumed.Add(1) - return nil - }) - td := testdata.GenerateTraces(10) - for j := 0; j < requestsCount; j++ { - require.NoError(tb, q.Offer(context.Background(), td)) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + q := newBoundedMemoryQueue[ptrace.Traces](memoryQueueSettings[ptrace.Traces]{sizer: tt.sizer, capacity: int64(100)}) + consumed := &atomic.Int64{} + require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) + ac := newAsyncConsumer(q, 1, func(context.Context, ptrace.Traces) error { + consumed.Add(1) + return nil + }) + td := testdata.GenerateTraces(10) + for j := 0; j < 10; j++ { + require.NoError(t, q.Offer(context.Background(), td)) + } + assert.NoError(t, q.Shutdown(context.Background())) + assert.NoError(t, ac.Shutdown(context.Background())) + assert.Equal(t, int64(10), consumed.Load()) + }) } - assert.NoError(tb, q.Shutdown(context.Background())) - assert.NoError(tb, ac.Shutdown(context.Background())) - assert.Equal(tb, int64(requestsCount), consumed.Load()) } func TestZeroSizeNoConsumers(t *testing.T) { @@ -200,3 +183,39 @@ func (qc *asyncConsumer) Shutdown(_ context.Context) error { qc.stopWG.Wait() return nil } + +func BenchmarkOffer(b *testing.B) { + tests := []struct { + name string + sizer sizer[ptrace.Traces] + }{ + { + name: "requests_based", + sizer: &requestSizer[ptrace.Traces]{}, + }, + { + name: "items_based", + sizer: &itemsSizer{}, + }, + } + for _, tt := range tests { + b.Run(tt.name, func(b *testing.B) { + q := newBoundedMemoryQueue[ptrace.Traces](memoryQueueSettings[ptrace.Traces]{sizer: &requestSizer[ptrace.Traces]{}, capacity: int64(10 * b.N)}) + consumed := &atomic.Int64{} + require.NoError(b, q.Start(context.Background(), componenttest.NewNopHost())) + ac := newAsyncConsumer(q, 1, func(context.Context, ptrace.Traces) error { + consumed.Add(1) + return nil + }) + td := testdata.GenerateTraces(10) + b.ResetTimer() + b.ReportAllocs() + for j := 0; j < b.N; j++ { + require.NoError(b, q.Offer(context.Background(), td)) + } + assert.NoError(b, q.Shutdown(context.Background())) + assert.NoError(b, ac.Shutdown(context.Background())) + assert.Equal(b, int64(b.N), consumed.Load()) + }) + } +} diff --git a/exporter/exporterqueue/sized_channel.go b/exporter/exporterqueue/sized_channel.go index aeae9e32e28..f27df7e295e 100644 --- a/exporter/exporterqueue/sized_channel.go +++ b/exporter/exporterqueue/sized_channel.go @@ -3,61 +3,50 @@ package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue" -import "sync/atomic" +import ( + "errors" + "sync/atomic" +) + +var errInvalidSize = errors.New("invalid element size") // sizedChannel is a channel wrapper for sized elements with a capacity set to a total size of all the elements. // The channel will accept elements until the total size of the elements reaches the capacity. type sizedChannel[T any] struct { sizer sizer[T] used *atomic.Int64 - - // We need to store the capacity in a separate field because the capacity of the channel can be higher. - // It happens when we restore a persistent queue from a disk that is bigger than the pre-configured capacity. - cap int64 - ch chan T + ch chan T } -// newSizedChannel creates a sized elements channel. Each element is assigned a size by the provided sizer. -// chanCapacity is the capacity of the underlying channel which usually should be equal to the capacity of the queue to -// avoid blocking the producer. Optionally, the channel can be preloaded with the elements and their total size. +// newSizedChannel creates a sized elements channel. Each element is assigned a positive size by the provided sizer. +// capacity is the total capacity of the queue. func newSizedChannel[T any](capacity int64, sizer sizer[T]) *sizedChannel[T] { - used := &atomic.Int64{} - - ch := make(chan T, capacity) return &sizedChannel[T]{ sizer: sizer, - used: used, - cap: capacity, - ch: ch, + used: &atomic.Int64{}, + ch: make(chan T, capacity), } } // push puts the element into the queue with the given sized if there is enough capacity. -// Returns an error if the queue is full. The callback is called before the element is committed to the queue. -// If the callback returns an error, the element is not put into the queue and the error is returned. -// The size is the size of the element MUST be positive. +// Returns an error if the queue is full. func (vcq *sizedChannel[T]) push(el T) error { elSize := vcq.sizer.Sizeof(el) - if vcq.used.Add(elSize) > vcq.cap { - vcq.used.Add(-elSize) - return ErrQueueIsFull + if elSize <= 0 { + return errInvalidSize } - - select { - // for persistent queue implementation, channel len can be out of sync with used size. Attempt to put it - // into the channel. If it is full, simply returns ErrQueueIsFull error. This prevents potential deadlock issues. - case vcq.ch <- el: - return nil - default: + if vcq.used.Add(elSize) > int64(cap(vcq.ch)) { vcq.used.Add(-elSize) return ErrQueueIsFull } + + vcq.ch <- el + return nil } // pop removes the element from the queue and returns it. // The call blocks until there is an item available or the queue is stopped. // The function returns true when an item is consumed or false if the queue is stopped and emptied. -// The callback is called before the element is removed from the queue. It must return the size of the element. func (vcq *sizedChannel[T]) pop() (T, bool) { el, ok := <-vcq.ch if !ok { @@ -78,5 +67,5 @@ func (vcq *sizedChannel[T]) Size() int { } func (vcq *sizedChannel[T]) Capacity() int { - return int(vcq.cap) + return cap(vcq.ch) } diff --git a/exporter/exporterqueue/sized_channel_test.go b/exporter/exporterqueue/sized_channel_test.go index 9c42f4eaf4b..0ad73975b99 100644 --- a/exporter/exporterqueue/sized_channel_test.go +++ b/exporter/exporterqueue/sized_channel_test.go @@ -16,7 +16,7 @@ func (s sizerInt) Sizeof(el int) int64 { return int64(el) } -func TestSizedCapacityChannel(t *testing.T) { +func TestSizedChannel(t *testing.T) { q := newSizedChannel[int](7, sizerInt{}) require.NoError(t, q.push(1)) assert.Equal(t, 1, q.Size()) @@ -45,12 +45,7 @@ func TestSizedCapacityChannel(t *testing.T) { assert.Equal(t, 0, el) } -func TestSizedCapacityChannel_Offer_sizedNotFullButChannelFull(t *testing.T) { +func TestSizedChannel_OfferInvalidSize(t *testing.T) { q := newSizedChannel[int](1, sizerInt{}) - require.NoError(t, q.push(1)) - - q.used.Store(0) - err := q.push(1) - require.Error(t, err) - assert.Equal(t, ErrQueueIsFull, err) + require.ErrorIs(t, q.push(0), errInvalidSize) }