Skip to content

Commit

Permalink
[chore] Cleanup sized channel, no more exceptions for persistent queue (
Browse files Browse the repository at this point in the history
open-telemetry#12069)

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored Jan 10, 2025
1 parent 7425fe8 commit e6df9d8
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 84 deletions.
111 changes: 65 additions & 46 deletions exporter/exporterqueue/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,55 +99,38 @@ func TestShutdownWhileNotEmpty(t *testing.T) {
}))
}

func Benchmark_QueueUsage_1000_requests(b *testing.B) {
benchmarkQueueUsage(b, &requestSizer[ptrace.Traces]{}, 1000)
}

func Benchmark_QueueUsage_100000_requests(b *testing.B) {
benchmarkQueueUsage(b, &requestSizer[ptrace.Traces]{}, 100000)
}

func Benchmark_QueueUsage_10000_items(b *testing.B) {
// each request has 10 items: 1000 requests = 10000 items
benchmarkQueueUsage(b, &itemsSizer{}, 1000)
}

func Benchmark_QueueUsage_1M_items(b *testing.B) {
// each request has 10 items: 100000 requests = 1M items
benchmarkQueueUsage(b, &itemsSizer{}, 100000)
}

func TestQueueUsage(t *testing.T) {
t.Run("requests_based", func(t *testing.T) {
queueUsage(t, &requestSizer[ptrace.Traces]{}, 10)
})
t.Run("items_based", func(t *testing.T) {
queueUsage(t, &itemsSizer{}, 10)
})
}

func benchmarkQueueUsage(b *testing.B, sizer sizer[ptrace.Traces], requestsCount int) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
queueUsage(b, sizer, requestsCount)
tests := []struct {
name string
sizer sizer[ptrace.Traces]
}{
{
name: "requests_based",
sizer: &requestSizer[ptrace.Traces]{},
},
{
name: "items_based",
sizer: &itemsSizer{},
},
}
}

func queueUsage(tb testing.TB, sizer sizer[ptrace.Traces], requestsCount int) {
q := newBoundedMemoryQueue[ptrace.Traces](memoryQueueSettings[ptrace.Traces]{sizer: sizer, capacity: int64(10 * requestsCount)})
consumed := &atomic.Int64{}
require.NoError(tb, q.Start(context.Background(), componenttest.NewNopHost()))
ac := newAsyncConsumer(q, 1, func(context.Context, ptrace.Traces) error {
consumed.Add(1)
return nil
})
td := testdata.GenerateTraces(10)
for j := 0; j < requestsCount; j++ {
require.NoError(tb, q.Offer(context.Background(), td))
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
q := newBoundedMemoryQueue[ptrace.Traces](memoryQueueSettings[ptrace.Traces]{sizer: tt.sizer, capacity: int64(100)})
consumed := &atomic.Int64{}
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
ac := newAsyncConsumer(q, 1, func(context.Context, ptrace.Traces) error {
consumed.Add(1)
return nil
})
td := testdata.GenerateTraces(10)
for j := 0; j < 10; j++ {
require.NoError(t, q.Offer(context.Background(), td))
}
assert.NoError(t, q.Shutdown(context.Background()))
assert.NoError(t, ac.Shutdown(context.Background()))
assert.Equal(t, int64(10), consumed.Load())
})
}
assert.NoError(tb, q.Shutdown(context.Background()))
assert.NoError(tb, ac.Shutdown(context.Background()))
assert.Equal(tb, int64(requestsCount), consumed.Load())
}

func TestZeroSizeNoConsumers(t *testing.T) {
Expand Down Expand Up @@ -200,3 +183,39 @@ func (qc *asyncConsumer) Shutdown(_ context.Context) error {
qc.stopWG.Wait()
return nil
}

func BenchmarkOffer(b *testing.B) {
tests := []struct {
name string
sizer sizer[ptrace.Traces]
}{
{
name: "requests_based",
sizer: &requestSizer[ptrace.Traces]{},
},
{
name: "items_based",
sizer: &itemsSizer{},
},
}
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
q := newBoundedMemoryQueue[ptrace.Traces](memoryQueueSettings[ptrace.Traces]{sizer: &requestSizer[ptrace.Traces]{}, capacity: int64(10 * b.N)})
consumed := &atomic.Int64{}
require.NoError(b, q.Start(context.Background(), componenttest.NewNopHost()))
ac := newAsyncConsumer(q, 1, func(context.Context, ptrace.Traces) error {
consumed.Add(1)
return nil
})
td := testdata.GenerateTraces(10)
b.ResetTimer()
b.ReportAllocs()
for j := 0; j < b.N; j++ {
require.NoError(b, q.Offer(context.Background(), td))
}
assert.NoError(b, q.Shutdown(context.Background()))
assert.NoError(b, ac.Shutdown(context.Background()))
assert.Equal(b, int64(b.N), consumed.Load())
})
}
}
49 changes: 19 additions & 30 deletions exporter/exporterqueue/sized_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,61 +3,50 @@

package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue"

import "sync/atomic"
import (
"errors"
"sync/atomic"
)

var errInvalidSize = errors.New("invalid element size")

// sizedChannel is a channel wrapper for sized elements with a capacity set to a total size of all the elements.
// The channel will accept elements until the total size of the elements reaches the capacity.
type sizedChannel[T any] struct {
sizer sizer[T]
used *atomic.Int64

// We need to store the capacity in a separate field because the capacity of the channel can be higher.
// It happens when we restore a persistent queue from a disk that is bigger than the pre-configured capacity.
cap int64
ch chan T
ch chan T
}

// newSizedChannel creates a sized elements channel. Each element is assigned a size by the provided sizer.
// chanCapacity is the capacity of the underlying channel which usually should be equal to the capacity of the queue to
// avoid blocking the producer. Optionally, the channel can be preloaded with the elements and their total size.
// newSizedChannel creates a sized elements channel. Each element is assigned a positive size by the provided sizer.
// capacity is the total capacity of the queue.
func newSizedChannel[T any](capacity int64, sizer sizer[T]) *sizedChannel[T] {
used := &atomic.Int64{}

ch := make(chan T, capacity)
return &sizedChannel[T]{
sizer: sizer,
used: used,
cap: capacity,
ch: ch,
used: &atomic.Int64{},
ch: make(chan T, capacity),
}
}

// push puts the element into the queue with the given sized if there is enough capacity.
// Returns an error if the queue is full. The callback is called before the element is committed to the queue.
// If the callback returns an error, the element is not put into the queue and the error is returned.
// The size is the size of the element MUST be positive.
// Returns an error if the queue is full.
func (vcq *sizedChannel[T]) push(el T) error {
elSize := vcq.sizer.Sizeof(el)
if vcq.used.Add(elSize) > vcq.cap {
vcq.used.Add(-elSize)
return ErrQueueIsFull
if elSize <= 0 {
return errInvalidSize
}

select {
// for persistent queue implementation, channel len can be out of sync with used size. Attempt to put it
// into the channel. If it is full, simply returns ErrQueueIsFull error. This prevents potential deadlock issues.
case vcq.ch <- el:
return nil
default:
if vcq.used.Add(elSize) > int64(cap(vcq.ch)) {
vcq.used.Add(-elSize)
return ErrQueueIsFull
}

vcq.ch <- el
return nil
}

// pop removes the element from the queue and returns it.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped and emptied.
// The callback is called before the element is removed from the queue. It must return the size of the element.
func (vcq *sizedChannel[T]) pop() (T, bool) {
el, ok := <-vcq.ch
if !ok {
Expand All @@ -78,5 +67,5 @@ func (vcq *sizedChannel[T]) Size() int {
}

func (vcq *sizedChannel[T]) Capacity() int {
return int(vcq.cap)
return cap(vcq.ch)
}
11 changes: 3 additions & 8 deletions exporter/exporterqueue/sized_channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func (s sizerInt) Sizeof(el int) int64 {
return int64(el)
}

func TestSizedCapacityChannel(t *testing.T) {
func TestSizedChannel(t *testing.T) {
q := newSizedChannel[int](7, sizerInt{})
require.NoError(t, q.push(1))
assert.Equal(t, 1, q.Size())
Expand Down Expand Up @@ -45,12 +45,7 @@ func TestSizedCapacityChannel(t *testing.T) {
assert.Equal(t, 0, el)
}

func TestSizedCapacityChannel_Offer_sizedNotFullButChannelFull(t *testing.T) {
func TestSizedChannel_OfferInvalidSize(t *testing.T) {
q := newSizedChannel[int](1, sizerInt{})
require.NoError(t, q.push(1))

q.used.Store(0)
err := q.push(1)
require.Error(t, err)
assert.Equal(t, ErrQueueIsFull, err)
require.ErrorIs(t, q.push(0), errInvalidSize)
}

0 comments on commit e6df9d8

Please sign in to comment.