From 786299f4b2919c5de711de6f3c89dce483778034 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Fri, 10 Nov 2023 10:04:21 -0600 Subject: [PATCH] Simplify logic in boundedMemoryQueue, use channels len/cap (#8829) Signed-off-by: Bogdan Drutu --- .chloggen/simplifyqueue.yaml | 13 ++++++++++ .../internal/bounded_memory_queue.go | 24 ++++--------------- .../internal/bounded_memory_queue_test.go | 15 +++++++++++- 3 files changed, 31 insertions(+), 21 deletions(-) create mode 100755 .chloggen/simplifyqueue.yaml diff --git a/.chloggen/simplifyqueue.yaml b/.chloggen/simplifyqueue.yaml new file mode 100755 index 00000000000..eef66b77e98 --- /dev/null +++ b/.chloggen/simplifyqueue.yaml @@ -0,0 +1,13 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: 'exporterhelper' + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Simplify logic in boundedMemoryQueue, use channels len/cap" + +# One or more tracking issues or pull requests related to the change +issues: [8829] diff --git a/exporter/exporterhelper/internal/bounded_memory_queue.go b/exporter/exporterhelper/internal/bounded_memory_queue.go index c7f8655338a..e5ae9c1bbfe 100644 --- a/exporter/exporterhelper/internal/bounded_memory_queue.go +++ b/exporter/exporterhelper/internal/bounded_memory_queue.go @@ -18,26 +18,21 @@ import ( // the producer are dropped. type boundedMemoryQueue struct { stopWG sync.WaitGroup - size *atomic.Uint32 stopped *atomic.Bool items chan Request - capacity uint32 numConsumers int } -// NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional -// callback for dropped items (e.g. useful to emit metrics). +// NewBoundedMemoryQueue constructs the new queue of specified capacity. Capacity cannot be 0. func NewBoundedMemoryQueue(capacity int, numConsumers int) ProducerConsumerQueue { return &boundedMemoryQueue{ items: make(chan Request, capacity), stopped: &atomic.Bool{}, - size: &atomic.Uint32{}, - capacity: uint32(capacity), numConsumers: numConsumers, } } -// StartConsumers starts a given number of goroutines consuming items from the queue +// Start starts a given number of goroutines consuming items from the queue // and passing them into the consumer callback. func (q *boundedMemoryQueue) Start(_ context.Context, _ component.Host, set QueueSettings) error { var startWG sync.WaitGroup @@ -48,7 +43,6 @@ func (q *boundedMemoryQueue) Start(_ context.Context, _ component.Host, set Queu startWG.Done() defer q.stopWG.Done() for item := range q.items { - q.size.Add(^uint32(0)) set.Callback(item) } }() @@ -63,20 +57,10 @@ func (q *boundedMemoryQueue) Produce(item Request) bool { return false } - // we might have two concurrent backing queues at the moment - // their combined size is stored in q.size, and their combined capacity - // should match the capacity of the new queue - if q.size.Load() >= q.capacity { - return false - } - - q.size.Add(1) select { case q.items <- item: return true default: - // should not happen, as overflows should have been captured earlier - q.size.Add(^uint32(0)) return false } } @@ -91,11 +75,11 @@ func (q *boundedMemoryQueue) Stop() { // Size returns the current size of the queue func (q *boundedMemoryQueue) Size() int { - return int(q.size.Load()) + return len(q.items) } func (q *boundedMemoryQueue) Capacity() int { - return int(q.capacity) + return cap(q.items) } func (q *boundedMemoryQueue) IsPersistent() bool { diff --git a/exporter/exporterhelper/internal/bounded_memory_queue_test.go b/exporter/exporterhelper/internal/bounded_memory_queue_test.go index 04f641346b1..15efcaff6cc 100644 --- a/exporter/exporterhelper/internal/bounded_memory_queue_test.go +++ b/exporter/exporterhelper/internal/bounded_memory_queue_test.go @@ -247,11 +247,24 @@ func (s *consumerState) assertConsumed(expected map[string]bool) { assert.Equal(s.t, expected, s.snapshot()) } -func TestZeroSize(t *testing.T) { +func TestZeroSizeWithConsumers(t *testing.T) { q := NewBoundedMemoryQueue(0, 1) err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item Request) {})) assert.NoError(t, err) + assert.True(t, q.Produce(newStringRequest("a"))) // in process + + q.Stop() +} + +func TestZeroSizeNoConsumers(t *testing.T) { + q := NewBoundedMemoryQueue(0, 0) + + err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item Request) {})) + assert.NoError(t, err) + assert.False(t, q.Produce(newStringRequest("a"))) // in process + + q.Stop() }