Skip to content

Commit

Permalink
[chore] [exporterhelper] Move workers from queue to queueSender (#8898)
Browse files Browse the repository at this point in the history
Move the common workers loop logic from each queue implementation to the
sender.

Based on
#8828
  • Loading branch information
dmitryax authored Nov 15, 2023
1 parent 6bbffee commit df6448b
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 143 deletions.
46 changes: 15 additions & 31 deletions exporter/exporterhelper/internal/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe

import (
"context"
"sync"
"sync/atomic"

"go.opentelemetry.io/collector/component"
Expand All @@ -17,41 +16,21 @@ import (
// where the queue is bounded and if it fills up due to slow consumers, the new items written by
// the producer are dropped.
type boundedMemoryQueue[T any] struct {
stopWG sync.WaitGroup
stopped *atomic.Bool
items chan QueueRequest[T]
numConsumers int
component.StartFunc
stopped *atomic.Bool
items chan QueueRequest[T]
}

// NewBoundedMemoryQueue constructs the new queue of specified capacity. Capacity cannot be 0.
func NewBoundedMemoryQueue[T any](capacity int, numConsumers int) Queue[T] {
// NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
func NewBoundedMemoryQueue[T any](capacity int) Queue[T] {
return &boundedMemoryQueue[T]{
items: make(chan QueueRequest[T], capacity),
stopped: &atomic.Bool{},
numConsumers: numConsumers,
items: make(chan QueueRequest[T], capacity),
stopped: &atomic.Bool{},
}
}

// Start starts a given number of goroutines consuming items from the queue
// and passing them into the consumer callback.
func (q *boundedMemoryQueue[T]) Start(_ context.Context, _ component.Host, set QueueSettings[T]) error {
var startWG sync.WaitGroup
for i := 0; i < q.numConsumers; i++ {
q.stopWG.Add(1)
startWG.Add(1)
go func() {
startWG.Done()
defer q.stopWG.Done()
for item := range q.items {
set.Callback(item)
}
}()
}
startWG.Wait()
return nil
}

// Offer is used by the producer to submit new item to the queue. Returns false in case of queue overflow.
// Offer is used by the producer to submit new item to the queue.
func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error {
if q.stopped.Load() {
return ErrQueueIsStopped
Expand All @@ -65,11 +44,16 @@ func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error {
}
}

// Poll returns a request from the queue once it's available. It returns false if the queue is stopped.
func (q *boundedMemoryQueue[T]) Poll() (QueueRequest[T], bool) {
item, ok := <-q.items
return item, ok
}

// Shutdown stops accepting items, and stops all consumers. It blocks until all consumers have stopped.
func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error {
q.stopped.Store(true) // disable producer
close(q.items)
q.stopWG.Wait()
return nil
}

Expand Down
67 changes: 34 additions & 33 deletions exporter/exporterhelper/internal/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,30 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
)

func newNopQueueSettings[T any]() QueueSettings[T] {
return newQueueSettings(func(request QueueRequest[T]) {
request.OnProcessingFinished()
})
}

func newQueueSettings[T any](callback func(QueueRequest[T])) QueueSettings[T] {
return QueueSettings[T]{
DataType: component.DataTypeMetrics,
Callback: callback,
}
}

// In this test we run a queue with capacity 1 and a single consumer.
// We want to test the overflow behavior, so we block the consumer
// by holding a startLock before submitting items to the queue.
func TestBoundedQueue(t *testing.T) {
q := NewBoundedMemoryQueue[string](1, 1)
q := NewBoundedMemoryQueue[string](1)

var startLock sync.Mutex

startLock.Lock() // block consumers
consumerState := newConsumerState(t)

assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost(), newQueueSettings(func(item QueueRequest[string]) {
defer item.OnProcessingFinished()
consumerState.record(item.Request)
assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
consumers := NewQueueConsumers(q, 1, func(_ context.Context, item string) {
consumerState.record(item)

// block further processing until startLock is released
startLock.Lock()
//nolint:staticcheck // SA2001 ignore this!
startLock.Unlock()
})))
})
consumers.Start()

assert.NoError(t, q.Offer(context.Background(), "a"))

Expand Down Expand Up @@ -93,6 +80,7 @@ func TestBoundedQueue(t *testing.T) {
}

assert.NoError(t, q.Shutdown(context.Background()))
consumers.Shutdown()
assert.ErrorIs(t, q.Offer(context.Background(), "x"), ErrQueueIsStopped)
}

Expand All @@ -103,15 +91,16 @@ func TestBoundedQueue(t *testing.T) {
// only after Stop will mean the consumers are still locked while
// trying to perform the final consumptions.
func TestShutdownWhileNotEmpty(t *testing.T) {
q := NewBoundedMemoryQueue[string](10, 1)
q := NewBoundedMemoryQueue[string](10)

consumerState := newConsumerState(t)

assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost(), newQueueSettings(func(item QueueRequest[string]) {
defer item.OnProcessingFinished()
consumerState.record(item.Request)
assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
consumers := NewQueueConsumers(q, 1, func(_ context.Context, item string) {
consumerState.record(item)
time.Sleep(1 * time.Second)
})))
})
consumers.Start()

assert.NoError(t, q.Offer(context.Background(), "a"))
assert.NoError(t, q.Offer(context.Background(), "b"))
Expand All @@ -125,6 +114,7 @@ func TestShutdownWhileNotEmpty(t *testing.T) {
assert.NoError(t, q.Offer(context.Background(), "j"))

assert.NoError(t, q.Shutdown(context.Background()))
consumers.Shutdown()

assert.ErrorIs(t, q.Offer(context.Background(), "x"), ErrQueueIsStopped)
consumerState.assertConsumed(map[string]bool{
Expand All @@ -142,6 +132,13 @@ func TestShutdownWhileNotEmpty(t *testing.T) {
assert.Equal(t, 0, q.Size())
}

func TestZeroSize(t *testing.T) {
q := NewBoundedMemoryQueue[string](0)
assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
assert.ErrorIs(t, q.Offer(context.Background(), "a"), ErrQueueIsFull)
assert.NoError(t, q.Shutdown(context.Background()))
}

func Benchmark_QueueUsage_10000_1_50000(b *testing.B) {
queueUsage(b, 10000, 1, 50000)
}
Expand Down Expand Up @@ -187,16 +184,17 @@ func Benchmark_QueueUsage_10000_10_250000(b *testing.B) {
func queueUsage(b *testing.B, capacity int, numConsumers int, numberOfItems int) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
q := NewBoundedMemoryQueue[string](capacity, numConsumers)
err := q.Start(context.Background(), componenttest.NewNopHost(), newQueueSettings(func(item QueueRequest[string]) {
defer item.OnProcessingFinished()
q := NewBoundedMemoryQueue[string](capacity)
require.NoError(b, q.Start(context.Background(), componenttest.NewNopHost()))
consumers := NewQueueConsumers(q, numConsumers, func(context.Context, string) {
time.Sleep(1 * time.Millisecond)
}))
require.NoError(b, err)
})
consumers.Start()
for j := 0; j < numberOfItems; j++ {
_ = q.Offer(context.Background(), fmt.Sprintf("%d", j))
}
assert.NoError(b, q.Shutdown(context.Background()))
consumers.Shutdown()
}
}

Expand Down Expand Up @@ -246,20 +244,23 @@ func (s *consumerState) assertConsumed(expected map[string]bool) {
}

func TestZeroSizeWithConsumers(t *testing.T) {
q := NewBoundedMemoryQueue[string](0, 1)
q := NewBoundedMemoryQueue[string](0)

err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings[string]())
err := q.Start(context.Background(), componenttest.NewNopHost())
consumers := NewQueueConsumers(q, 1, func(context.Context, string) {})
consumers.Start()
assert.NoError(t, err)

assert.NoError(t, q.Offer(context.Background(), "a")) // in process

assert.NoError(t, q.Shutdown(context.Background()))
consumers.Shutdown()
}

func TestZeroSizeNoConsumers(t *testing.T) {
q := NewBoundedMemoryQueue[string](0, 0)
q := NewBoundedMemoryQueue[string](0)

err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings[string]())
err := q.Start(context.Background(), componenttest.NewNopHost())
assert.NoError(t, err)

assert.ErrorIs(t, q.Offer(context.Background(), "a"), ErrQueueIsFull) // in process
Expand Down
52 changes: 52 additions & 0 deletions exporter/exporterhelper/internal/consumers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal"

import (
"context"
"sync"
)

type QueueConsumers[T any] struct {
queue Queue[T]
numConsumers int
callback func(context.Context, T)
stopWG sync.WaitGroup
}

func NewQueueConsumers[T any](q Queue[T], numConsumers int, callback func(context.Context, T)) *QueueConsumers[T] {
return &QueueConsumers[T]{
queue: q,
numConsumers: numConsumers,
callback: callback,
stopWG: sync.WaitGroup{},
}
}

// Start ensures that all consumers are started.
func (c *QueueConsumers[T]) Start() {
var startWG sync.WaitGroup
for i := 0; i < c.numConsumers; i++ {
c.stopWG.Add(1)
startWG.Add(1)
go func() {
startWG.Done()
defer c.stopWG.Done()
for {
item, success := c.queue.Poll()
if !success {
return
}
c.callback(item.Context, item.Request)
item.OnProcessingFinished()
}
}()
}
startWG.Wait()
}

// Shutdown ensures that all consumers are stopped.
func (c *QueueConsumers[T]) Shutdown() {
c.stopWG.Wait()
}
36 changes: 7 additions & 29 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe
import (
"context"
"errors"
"sync"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
Expand All @@ -21,52 +20,31 @@ var (
// persistentQueue holds the queue backed by file storage
type persistentQueue[T any] struct {
*persistentContiguousStorage[T]
stopWG sync.WaitGroup
set exporter.CreateSettings
storageID component.ID
numConsumers int
set exporter.CreateSettings
storageID component.ID
dataType component.DataType
}

// NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage
func NewPersistentQueue[T any](capacity int, numConsumers int, storageID component.ID, marshaler func(req T) ([]byte, error), unmarshaler func([]byte) (T, error), set exporter.CreateSettings) Queue[T] {
func NewPersistentQueue[T any](capacity int, dataType component.DataType, storageID component.ID, marshaler func(req T) ([]byte, error), unmarshaler func([]byte) (T, error), set exporter.CreateSettings) Queue[T] {
return &persistentQueue[T]{
persistentContiguousStorage: newPersistentContiguousStorage(set.Logger, uint64(capacity), marshaler, unmarshaler),
numConsumers: numConsumers,
set: set,
storageID: storageID,
dataType: dataType,
}
}

// Start starts the persistentQueue with the given number of consumers.
func (pq *persistentQueue[T]) Start(ctx context.Context, host component.Host, set QueueSettings[T]) error {
storageClient, err := toStorageClient(ctx, pq.storageID, host, pq.set.ID, set.DataType)
func (pq *persistentQueue[T]) Start(ctx context.Context, host component.Host) error {
storageClient, err := toStorageClient(ctx, pq.storageID, host, pq.set.ID, pq.dataType)
if err != nil {
return err
}
pq.persistentContiguousStorage.start(ctx, storageClient)
for i := 0; i < pq.numConsumers; i++ {
pq.stopWG.Add(1)
go func() {
defer pq.stopWG.Done()
for {
req, found := pq.persistentContiguousStorage.get()
if !found {
return
}
set.Callback(req)
}
}()
}
return nil
}

// Shutdown stops accepting items, shuts down the queue and closes the persistent queue
func (pq *persistentQueue[T]) Shutdown(ctx context.Context) error {
err := pq.persistentContiguousStorage.Shutdown(ctx)
pq.stopWG.Wait()
return err
}

func toStorageClient(ctx context.Context, storageID component.ID, host component.Host, ownerID component.ID, signal component.DataType) (storage.Client, error) {
ext, found := host.GetExtensions()[storageID]
if !found {
Expand Down
Loading

0 comments on commit df6448b

Please sign in to comment.