From 2d054e55d39b55160b85fa7deb2d4bb063230ead Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Mon, 20 Jan 2025 16:41:01 -0800 Subject: [PATCH] Cache item size --- exporter/exporterhelper/logs.go | 19 +++++-- exporter/exporterhelper/logs_batch.go | 15 +++-- exporter/exporterhelper/logs_batch_test.go | 56 ++++++++++++++++++- exporter/exporterhelper/metrics.go | 19 +++++-- exporter/exporterhelper/metrics_batch.go | 15 +++-- exporter/exporterhelper/metrics_batch_test.go | 56 ++++++++++++++++++- exporter/exporterhelper/traces.go | 19 +++++-- exporter/exporterhelper/traces_batch.go | 13 ++++- exporter/exporterhelper/traces_batch_test.go | 56 ++++++++++++++++++- 9 files changed, 239 insertions(+), 29 deletions(-) diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index 74a658b98fe..a73e28282e5 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -25,14 +25,16 @@ var ( ) type logsRequest struct { - ld plog.Logs - pusher consumer.ConsumeLogsFunc + ld plog.Logs + pusher consumer.ConsumeLogsFunc + cachedItemsCount int } func newLogsRequest(ld plog.Logs, pusher consumer.ConsumeLogsFunc) Request { return &logsRequest{ - ld: ld, - pusher: pusher, + ld: ld, + pusher: pusher, + cachedItemsCount: 0, } } @@ -63,7 +65,14 @@ func (req *logsRequest) Export(ctx context.Context) error { } func (req *logsRequest) ItemsCount() int { - return req.ld.LogRecordCount() + if req.cachedItemsCount == 0 { + req.cachedItemsCount = req.ld.LogRecordCount() + } + return req.cachedItemsCount +} + +func (req *logsRequest) setCachedItemsCount(count int) { + req.cachedItemsCount = count } type logsExporter struct { diff --git a/exporter/exporterhelper/logs_batch.go b/exporter/exporterhelper/logs_batch.go index 4e4609b18ca..36c3ad62658 100644 --- a/exporter/exporterhelper/logs_batch.go +++ b/exporter/exporterhelper/logs_batch.go @@ -38,12 +38,14 @@ func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSiz continue } - srcCount := srcReq.ld.LogRecordCount() + srcCount := srcReq.ItemsCount() if srcCount <= capacityLeft { if destReq == nil { destReq = srcReq } else { srcReq.ld.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs()) + destReq.setCachedItemsCount(srcCount) + srcReq.setCachedItemsCount(0) } capacityLeft -= srcCount continue @@ -51,16 +53,21 @@ func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSiz for { extractedLogs := extractLogs(srcReq.ld, capacityLeft) - if extractedLogs.LogRecordCount() == 0 { + extractedCount := extractedLogs.LogRecordCount() + if extractedCount == 0 { break } - capacityLeft -= extractedLogs.LogRecordCount() + if destReq == nil { - destReq = &logsRequest{ld: extractedLogs, pusher: srcReq.pusher} + destReq = &logsRequest{ld: extractedLogs, pusher: srcReq.pusher, cachedItemsCount: extractedCount} } else { extractedLogs.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs()) + destReq.setCachedItemsCount(destReq.ItemsCount() + extractedCount) + srcReq.setCachedItemsCount(srcReq.ItemsCount() - extractedCount) } + // Create new batch once capacity is reached. + capacityLeft -= extractedCount if capacityLeft == 0 { res = append(res, destReq) destReq = nil diff --git a/exporter/exporterhelper/logs_batch_test.go b/exporter/exporterhelper/logs_batch_test.go index d05d87764ee..9a77077bac8 100644 --- a/exporter/exporterhelper/logs_batch_test.go +++ b/exporter/exporterhelper/logs_batch_test.go @@ -123,7 +123,7 @@ func TestMergeSplitLogs(t *testing.T) { require.NoError(t, err) assert.Equal(t, len(tt.expected), len(res)) for i, r := range res { - assert.Equal(t, tt.expected[i], r.(*logsRequest)) + assert.Equal(t, tt.expected[i].ld, r.(*logsRequest).ld) } }) } @@ -152,3 +152,57 @@ func TestExtractLogs(t *testing.T) { assert.Equal(t, 10-i, ld.LogRecordCount()) } } + +func BenchmarkSplittingBasedOnItemCountManySmallLogs(b *testing.B) { + // All requests merge into a single batch. + cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000} + for i := 0; i < b.N; i++ { + merged := []Request{&logsRequest{ld: testdata.GenerateLogs(10)}} + for j := 0; j < 1000; j++ { + lr2 := &logsRequest{ld: testdata.GenerateLogs(10)} + res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2) + merged = append(merged[0:len(merged)-1], res...) + } + assert.Len(b, merged, 1) + } +} + +func BenchmarkSplittingBasedOnItemCountManyLogsSlightlyAboveLimit(b *testing.B) { + // Every incoming request results in a split. + cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000} + for i := 0; i < b.N; i++ { + merged := []Request{&logsRequest{ld: testdata.GenerateLogs(0)}} + for j := 0; j < 10; j++ { + lr2 := &logsRequest{ld: testdata.GenerateLogs(10001)} + res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2) + merged = append(merged[0:len(merged)-1], res...) + } + assert.Len(b, merged, 11) + } +} + +func BenchmarkSplittingBasedOnItemCountManyLogsSlightlyBelowLimit(b *testing.B) { + // Every incoming request results in a split. + cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000} + for i := 0; i < b.N; i++ { + merged := []Request{&logsRequest{ld: testdata.GenerateLogs(0)}} + for j := 0; j < 10; j++ { + lr2 := &logsRequest{ld: testdata.GenerateLogs(9999)} + res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2) + merged = append(merged[0:len(merged)-1], res...) + } + assert.Len(b, merged, 10) + } +} + +func BenchmarkSplittingBasedOnItemCountHugeLogs(b *testing.B) { + // One request splits into many batches. + cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000} + for i := 0; i < b.N; i++ { + merged := []Request{&logsRequest{ld: testdata.GenerateLogs(0)}} + lr2 := &logsRequest{ld: testdata.GenerateLogs(100000)} + res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2) + merged = append(merged[0:len(merged)-1], res...) + assert.Len(b, merged, 10) + } +} diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index e3d4ccbd2ae..18c39fa3db5 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -25,14 +25,16 @@ var ( ) type metricsRequest struct { - md pmetric.Metrics - pusher consumer.ConsumeMetricsFunc + md pmetric.Metrics + pusher consumer.ConsumeMetricsFunc + cachedItemsCount int } func newMetricsRequest(md pmetric.Metrics, pusher consumer.ConsumeMetricsFunc) Request { return &metricsRequest{ - md: md, - pusher: pusher, + md: md, + pusher: pusher, + cachedItemsCount: 0, } } @@ -63,7 +65,14 @@ func (req *metricsRequest) Export(ctx context.Context) error { } func (req *metricsRequest) ItemsCount() int { - return req.md.DataPointCount() + if req.cachedItemsCount == 0 { + req.cachedItemsCount = req.md.DataPointCount() + } + return req.cachedItemsCount +} + +func (req *metricsRequest) setCachedItemsCount(count int) { + req.cachedItemsCount = count } type metricsExporter struct { diff --git a/exporter/exporterhelper/metrics_batch.go b/exporter/exporterhelper/metrics_batch.go index 3ec240d40a6..a710254ea89 100644 --- a/exporter/exporterhelper/metrics_batch.go +++ b/exporter/exporterhelper/metrics_batch.go @@ -38,11 +38,13 @@ func (req *metricsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.Max continue } - srcCount := srcReq.md.DataPointCount() + srcCount := srcReq.ItemsCount() if srcCount <= capacityLeft { if destReq == nil { destReq = srcReq } else { + destReq.setCachedItemsCount(srcCount) + srcReq.setCachedItemsCount(0) srcReq.md.ResourceMetrics().MoveAndAppendTo(destReq.md.ResourceMetrics()) } capacityLeft -= srcCount @@ -51,16 +53,21 @@ func (req *metricsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.Max for { extractedMetrics := extractMetrics(srcReq.md, capacityLeft) - if extractedMetrics.DataPointCount() == 0 { + extractedCount := extractedMetrics.DataPointCount() + if extractedCount == 0 { break } - capacityLeft -= extractedMetrics.DataPointCount() + if destReq == nil { - destReq = &metricsRequest{md: extractedMetrics, pusher: srcReq.pusher} + destReq = &metricsRequest{md: extractedMetrics, pusher: srcReq.pusher, cachedItemsCount: extractedCount} } else { + destReq.setCachedItemsCount(destReq.ItemsCount() + extractedCount) + srcReq.setCachedItemsCount(srcReq.ItemsCount() - extractedCount) extractedMetrics.ResourceMetrics().MoveAndAppendTo(destReq.md.ResourceMetrics()) } + // Create new batch once capacity is reached. + capacityLeft -= extractedCount if capacityLeft == 0 { res = append(res, destReq) destReq = nil diff --git a/exporter/exporterhelper/metrics_batch_test.go b/exporter/exporterhelper/metrics_batch_test.go index d6f583a6844..08fc1fab425 100644 --- a/exporter/exporterhelper/metrics_batch_test.go +++ b/exporter/exporterhelper/metrics_batch_test.go @@ -123,7 +123,7 @@ func TestMergeSplitMetrics(t *testing.T) { require.NoError(t, err) assert.Equal(t, len(tt.expected), len(res)) for i := range res { - assert.Equal(t, tt.expected[i], res[i].(*metricsRequest)) + assert.Equal(t, tt.expected[i].md, res[i].(*metricsRequest).md) } }) } @@ -159,3 +159,57 @@ func TestExtractMetricsInvalidMetric(t *testing.T) { assert.Equal(t, testdata.GenerateMetricsMetricTypeInvalid(), extractedMetrics) assert.Equal(t, 0, md.ResourceMetrics().Len()) } + +func BenchmarkSplittingBasedOnItemCountManySmallMetrics(b *testing.B) { + // All requests merge into a single batch. + cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 20000} + for i := 0; i < b.N; i++ { + merged := []Request{&metricsRequest{md: testdata.GenerateMetrics(10)}} + for j := 0; j < 1000; j++ { + lr2 := &metricsRequest{md: testdata.GenerateMetrics(10)} + res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2) + merged = append(merged[0:len(merged)-1], res...) + } + assert.Len(b, merged, 1) + } +} + +func BenchmarkSplittingBasedOnItemCountManyMetricsSlightlyAboveLimit(b *testing.B) { + // Every incoming request results in a split. + cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 20000} + for i := 0; i < b.N; i++ { + merged := []Request{&metricsRequest{md: testdata.GenerateMetrics(0)}} + for j := 0; j < 10; j++ { + lr2 := &metricsRequest{md: testdata.GenerateMetrics(10001)} + res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2) + merged = append(merged[0:len(merged)-1], res...) + } + assert.Len(b, merged, 11) + } +} + +func BenchmarkSplittingBasedOnItemCountManyMetricsSlightlyBelowLimit(b *testing.B) { + // Every incoming request results in a split. + cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 20000} + for i := 0; i < b.N; i++ { + merged := []Request{&metricsRequest{md: testdata.GenerateMetrics(0)}} + for j := 0; j < 10; j++ { + lr2 := &metricsRequest{md: testdata.GenerateMetrics(9999)} + res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2) + merged = append(merged[0:len(merged)-1], res...) + } + assert.Len(b, merged, 10) + } +} + +func BenchmarkSplittingBasedOnItemCountHugeMetrics(b *testing.B) { + // One request splits into many batches. + cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 20000} + for i := 0; i < b.N; i++ { + merged := []Request{&metricsRequest{md: testdata.GenerateMetrics(0)}} + lr2 := &metricsRequest{md: testdata.GenerateMetrics(100000)} + res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2) + merged = append(merged[0:len(merged)-1], res...) + assert.Len(b, merged, 10) + } +} diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index f8387d5a3b8..f2460305c6a 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -25,14 +25,16 @@ var ( ) type tracesRequest struct { - td ptrace.Traces - pusher consumer.ConsumeTracesFunc + td ptrace.Traces + pusher consumer.ConsumeTracesFunc + cachedItemsCount int } func newTracesRequest(td ptrace.Traces, pusher consumer.ConsumeTracesFunc) Request { return &tracesRequest{ - td: td, - pusher: pusher, + td: td, + pusher: pusher, + cachedItemsCount: 0, } } @@ -63,7 +65,14 @@ func (req *tracesRequest) Export(ctx context.Context) error { } func (req *tracesRequest) ItemsCount() int { - return req.td.SpanCount() + if req.cachedItemsCount == 0 { + req.cachedItemsCount = req.td.SpanCount() + } + return req.cachedItemsCount +} + +func (req *tracesRequest) setCachedItemsCount(count int) { + req.cachedItemsCount = count } type tracesExporter struct { diff --git a/exporter/exporterhelper/traces_batch.go b/exporter/exporterhelper/traces_batch.go index 07a3025d73a..f774044c2c1 100644 --- a/exporter/exporterhelper/traces_batch.go +++ b/exporter/exporterhelper/traces_batch.go @@ -38,11 +38,13 @@ func (req *tracesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxS continue } - srcCount := srcReq.td.SpanCount() + srcCount := srcReq.ItemsCount() if srcCount <= capacityLeft { if destReq == nil { destReq = srcReq } else { + destReq.setCachedItemsCount(srcCount) + srcReq.setCachedItemsCount(0) srcReq.td.ResourceSpans().MoveAndAppendTo(destReq.td.ResourceSpans()) } capacityLeft -= srcCount @@ -51,16 +53,21 @@ func (req *tracesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxS for { extractedTraces := extractTraces(srcReq.td, capacityLeft) - if extractedTraces.SpanCount() == 0 { + extractedCount := extractedTraces.SpanCount() + if extractedCount == 0 { break } - capacityLeft -= extractedTraces.SpanCount() + if destReq == nil { destReq = &tracesRequest{td: extractedTraces, pusher: srcReq.pusher} } else { + destReq.setCachedItemsCount(destReq.ItemsCount() + extractedCount) + srcReq.setCachedItemsCount(srcReq.ItemsCount() - extractedCount) extractedTraces.ResourceSpans().MoveAndAppendTo(destReq.td.ResourceSpans()) } + // Create new batch once capacity is reached. + capacityLeft -= extractedCount if capacityLeft == 0 { res = append(res, destReq) destReq = nil diff --git a/exporter/exporterhelper/traces_batch_test.go b/exporter/exporterhelper/traces_batch_test.go index 2d84f254ed9..e23f510511e 100644 --- a/exporter/exporterhelper/traces_batch_test.go +++ b/exporter/exporterhelper/traces_batch_test.go @@ -130,7 +130,7 @@ func TestMergeSplitTraces(t *testing.T) { require.NoError(t, err) assert.Equal(t, len(tt.expected), len(res)) for i := range res { - assert.Equal(t, tt.expected[i], res[i].(*tracesRequest)) + assert.Equal(t, tt.expected[i].td, res[i].(*tracesRequest).td) } }) } @@ -159,3 +159,57 @@ func TestExtractTraces(t *testing.T) { assert.Equal(t, 10-i, td.SpanCount()) } } + +func BenchmarkSplittingBasedOnItemCountManySmallTraces(b *testing.B) { + // All requests merge into a single batch. + cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000} + for i := 0; i < b.N; i++ { + merged := []Request{&tracesRequest{td: testdata.GenerateTraces(10)}} + for j := 0; j < 1000; j++ { + lr2 := &tracesRequest{td: testdata.GenerateTraces(10)} + res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2) + merged = append(merged[0:len(merged)-1], res...) + } + assert.Len(b, merged, 1) + } +} + +func BenchmarkSplittingBasedOnItemCountManyTracesSlightlyAboveLimit(b *testing.B) { + // Every incoming request results in a split. + cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000} + for i := 0; i < b.N; i++ { + merged := []Request{&tracesRequest{td: testdata.GenerateTraces(0)}} + for j := 0; j < 10; j++ { + lr2 := &tracesRequest{td: testdata.GenerateTraces(10001)} + res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2) + merged = append(merged[0:len(merged)-1], res...) + } + assert.Len(b, merged, 11) + } +} + +func BenchmarkSplittingBasedOnItemCountManyTracesSlightlyBelowLimit(b *testing.B) { + // Every incoming request results in a split. + cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000} + for i := 0; i < b.N; i++ { + merged := []Request{&tracesRequest{td: testdata.GenerateTraces(0)}} + for j := 0; j < 10; j++ { + lr2 := &tracesRequest{td: testdata.GenerateTraces(9999)} + res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2) + merged = append(merged[0:len(merged)-1], res...) + } + assert.Len(b, merged, 10) + } +} + +func BenchmarkSplittingBasedOnItemCountHugeTraces(b *testing.B) { + // One request splits into many batches. + cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000} + for i := 0; i < b.N; i++ { + merged := []Request{&tracesRequest{td: testdata.GenerateTraces(0)}} + lr2 := &tracesRequest{td: testdata.GenerateTraces(100000)} + res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2) + merged = append(merged[0:len(merged)-1], res...) + assert.Len(b, merged, 10) + } +}