Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] [exporterhelper] Make memory queue benchmarks deteministic #9247

Merged
merged 1 commit into from
Jan 9, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 22 additions & 45 deletions exporter/exporterhelper/internal/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package internal

import (
"context"
"errors"
"strconv"
"sync"
"testing"
Expand Down Expand Up @@ -97,75 +96,53 @@ func TestShutdownWhileNotEmpty(t *testing.T) {
}))
}

func Benchmark_QueueUsage_10000_requests_1_50000(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 10000, 1, 50000)
func Benchmark_QueueUsage_1000_requests(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 1000)
}

func Benchmark_QueueUsage_10000_requests_10_50000(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 10000, 10, 50000)
func Benchmark_QueueUsage_100000_requests(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 100000)
}

func Benchmark_QueueUsage_50000_requests_1_50000(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 50000, 1, 50000)
func Benchmark_QueueUsage_10000_items(b *testing.B) {
// each request has 10 items: 1000 requests = 10000 items
benchmarkQueueUsage(b, &ItemsSizer[fakeReq]{}, 1000)
}

func Benchmark_QueueUsage_50000_requests_10_50000(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 50000, 10, 50000)
}

func Benchmark_QueueUsage_10000_requests_1_250000(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 10000, 1, 250000)
}

func Benchmark_QueueUsage_10000_requests_10_250000(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 10000, 10, 250000)
}

func Benchmark_QueueUsage_1M_items_10_250k(b *testing.B) {
benchmarkQueueUsage(b, &ItemsSizer[fakeReq]{}, 1000000, 10, 250000)
}

func Benchmark_QueueUsage_1M_items_10_1M(b *testing.B) {
benchmarkQueueUsage(b, &ItemsSizer[fakeReq]{}, 1000000, 10, 1000000)
}

func Benchmark_QueueUsage_100M_items_10_10M(b *testing.B) {
benchmarkQueueUsage(b, &ItemsSizer[fakeReq]{}, 100000000, 10, 10000000)
func Benchmark_QueueUsage_1M_items(b *testing.B) {
// each request has 10 items: 100000 requests = 1M items
benchmarkQueueUsage(b, &ItemsSizer[fakeReq]{}, 100000)
}

func TestQueueUsage(t *testing.T) {
t.Run("with enough workers", func(t *testing.T) {
queueUsage(t, &RequestSizer[fakeReq]{}, 10000, 5, 1000)
t.Run("requests_based", func(t *testing.T) {
queueUsage(t, &RequestSizer[fakeReq]{}, 10)
})
t.Run("past capacity", func(t *testing.T) {
queueUsage(t, &RequestSizer[fakeReq]{}, 10000, 2, 50000)
t.Run("items_based", func(t *testing.T) {
queueUsage(t, &ItemsSizer[fakeReq]{}, 10)
})
}

func benchmarkQueueUsage(b *testing.B, sizer Sizer[fakeReq], capacity int, numConsumers int,
numberOfItems int) {
func benchmarkQueueUsage(b *testing.B, sizer Sizer[fakeReq], requestsCount int) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
queueUsage(b, sizer, capacity, numConsumers, numberOfItems)
queueUsage(b, sizer, requestsCount)
}
}

func queueUsage(tb testing.TB, sizer Sizer[fakeReq], capacity int, numConsumers int, numberOfItems int) {
func queueUsage(tb testing.TB, sizer Sizer[fakeReq], requestsCount int) {
var wg sync.WaitGroup
wg.Add(numberOfItems)
q := NewBoundedMemoryQueue[fakeReq](MemoryQueueSettings[fakeReq]{Sizer: sizer, Capacity: capacity})
consumers := NewQueueConsumers(q, numConsumers, func(context.Context, fakeReq) error {
wg.Add(requestsCount)
q := NewBoundedMemoryQueue[fakeReq](MemoryQueueSettings[fakeReq]{Sizer: sizer, Capacity: 10 * requestsCount})
consumers := NewQueueConsumers(q, 1, func(context.Context, fakeReq) error {
wg.Done()
return nil
})
require.NoError(tb, consumers.Start(context.Background(), componenttest.NewNopHost()))
for j := 0; j < numberOfItems; j++ {
if err := q.Offer(context.Background(), fakeReq{10}); errors.Is(err, ErrQueueIsFull) {
wg.Done()
}
for j := 0; j < requestsCount; j++ {
require.NoError(tb, q.Offer(context.Background(), fakeReq{10}))
}
assert.NoError(tb, consumers.Shutdown(context.Background()))

wg.Wait()
}

Expand Down
Loading