diff --git a/processor/tailsamplingprocessor/internal/idbatcher/id_batcher_test.go b/processor/tailsamplingprocessor/internal/idbatcher/id_batcher_test.go index bb0cda216fad..29dd25b125c8 100644 --- a/processor/tailsamplingprocessor/internal/idbatcher/id_batcher_test.go +++ b/processor/tailsamplingprocessor/internal/idbatcher/id_batcher_test.go @@ -101,13 +101,19 @@ func concurrencyTest(t *testing.T, numBatches, newBatchesInitialCapacity, batchC } }() - ids := generateSequentialIds(10000) + ids := generateSequentialIds(100000) wg := &sync.WaitGroup{} + // Limit the concurrency here to avoid creating too many goroutines and hit + // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/9126 + concurrencyLimiter := make(chan struct{}, 128) + defer close(concurrencyLimiter) for i := 0; i < len(ids); i++ { wg.Add(1) + concurrencyLimiter <- struct{}{} go func(id pcommon.TraceID) { batcher.AddToCurrentBatch(id) wg.Done() + <-concurrencyLimiter }(ids[i]) } diff --git a/processor/tailsamplingprocessor/processor_test.go b/processor/tailsamplingprocessor/processor_test.go index f5da628dc503..e01aa6497618 100644 --- a/processor/tailsamplingprocessor/processor_test.go +++ b/processor/tailsamplingprocessor/processor_test.go @@ -191,7 +191,7 @@ func TestSequentialTraceArrival(t *testing.T) { } func TestConcurrentTraceArrival(t *testing.T) { - traceIds, batches := generateIdsAndBatches(128) + traceIds, batches := generateIdsAndBatches(2048) var wg sync.WaitGroup cfg := Config{ @@ -208,16 +208,24 @@ func TestConcurrentTraceArrival(t *testing.T) { require.NoError(t, tsp.Shutdown(context.Background())) }() + // Limit the concurrency here to avoid creating too many goroutines and hit + // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/9126 + concurrencyLimiter := make(chan struct{}, 128) + defer close(concurrencyLimiter) for _, batch := range batches { // Add the same traceId twice. wg.Add(2) + concurrencyLimiter <- struct{}{} go func(td ptrace.Traces) { require.NoError(t, tsp.ConsumeTraces(context.Background(), td)) wg.Done() + <-concurrencyLimiter }(batch) + concurrencyLimiter <- struct{}{} go func(td ptrace.Traces) { require.NoError(t, tsp.ConsumeTraces(context.Background(), td)) wg.Done() + <-concurrencyLimiter }(batch) }