Skip to content

Commit

Permalink
Merge branch 'main' into addNewTelemetrySettings
Browse files Browse the repository at this point in the history
  • Loading branch information
bogdandrutu authored Oct 25, 2024
2 parents 7b2bae9 + ef79a0e commit 60d05c1
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 9 deletions.
6 changes: 6 additions & 0 deletions exporter/exporterhelper/internal/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ func NewQueueSender(q exporterqueue.Queue[internal.Request], set exporter.Settin

// Start is invoked during service startup.
func (qs *QueueSender) Start(ctx context.Context, host component.Host) error {
if err := qs.queue.Start(ctx, host); err != nil {
return err
}
if err := qs.consumers.Start(ctx, host); err != nil {
return err
}
Expand All @@ -117,6 +120,9 @@ func (qs *QueueSender) Start(ctx context.Context, host component.Host) error {
func (qs *QueueSender) Shutdown(ctx context.Context) error {
// Stop the queue and consumers, this will drain the queue and will call the retry (which is stopped) that will only
// try once every request.
if err := qs.queue.Shutdown(ctx); err != nil {
return err
}
return qs.consumers.Shutdown(ctx)
}

Expand Down
2 changes: 2 additions & 0 deletions exporter/internal/queue/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,12 @@ func queueUsage(tb testing.TB, sizer Sizer[fakeReq], requestsCount int) {
wg.Done()
return nil
})
require.NoError(tb, q.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(tb, consumers.Start(context.Background(), componenttest.NewNopHost()))
for j := 0; j < requestsCount; j++ {
require.NoError(tb, q.Offer(context.Background(), fakeReq{10}))
}
assert.NoError(tb, q.Shutdown(context.Background()))
assert.NoError(tb, consumers.Shutdown(context.Background()))
wg.Wait()
}
Expand Down
11 changes: 2 additions & 9 deletions exporter/internal/queue/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,7 @@ func NewQueueConsumers[T any](q Queue[T], numConsumers int, consumeFunc func(con
}

// Start ensures that queue and all consumers are started.
func (qc *Consumers[T]) Start(ctx context.Context, host component.Host) error {
if err := qc.queue.Start(ctx, host); err != nil {
return err
}

func (qc *Consumers[T]) Start(_ context.Context, _ component.Host) error {
var startWG sync.WaitGroup
for i := 0; i < qc.numConsumers; i++ {
qc.stopWG.Add(1)
Expand All @@ -55,10 +51,7 @@ func (qc *Consumers[T]) Start(ctx context.Context, host component.Host) error {
}

// Shutdown ensures that queue and all consumers are stopped.
func (qc *Consumers[T]) Shutdown(ctx context.Context) error {
if err := qc.queue.Shutdown(ctx); err != nil {
return err
}
func (qc *Consumers[T]) Shutdown(_ context.Context) error {
qc.stopWG.Wait()
return nil
}
2 changes: 2 additions & 0 deletions exporter/internal/queue/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,10 @@ func createAndStartTestPersistentQueue(t *testing.T, sizer Sizer[tracesRequest],
{}: NewMockStorageExtension(nil),
}}
consumers := NewQueueConsumers(pq, numConsumers, consumeFunc)
require.NoError(t, pq.Start(context.Background(), host))
require.NoError(t, consumers.Start(context.Background(), host))
t.Cleanup(func() {
require.NoError(t, pq.Shutdown(context.Background()))
assert.NoError(t, consumers.Shutdown(context.Background()))
})
return pq
Expand Down

0 comments on commit 60d05c1

Please sign in to comment.