Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][exporter][batcher] Improve exporter.request merge splitting performance by caching items count #12136

Merged
merged 1 commit into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ var (
)

type logsRequest struct {
ld plog.Logs
pusher consumer.ConsumeLogsFunc
ld plog.Logs
pusher consumer.ConsumeLogsFunc
cachedItemsCount int
}

func newLogsRequest(ld plog.Logs, pusher consumer.ConsumeLogsFunc) Request {
return &logsRequest{
ld: ld,
pusher: pusher,
ld: ld,
pusher: pusher,
cachedItemsCount: -1,
}
}

Expand Down Expand Up @@ -63,7 +65,14 @@ func (req *logsRequest) Export(ctx context.Context) error {
}

func (req *logsRequest) ItemsCount() int {
return req.ld.LogRecordCount()
if req.cachedItemsCount == -1 {
req.cachedItemsCount = req.ld.LogRecordCount()
}
return req.cachedItemsCount
}

func (req *logsRequest) setCachedItemsCount(count int) {
req.cachedItemsCount = count
}

type logsExporter struct {
Expand Down
15 changes: 11 additions & 4 deletions exporter/exporterhelper/logs_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,29 +38,36 @@ func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSiz
continue
}

srcCount := srcReq.ld.LogRecordCount()
srcCount := srcReq.ItemsCount()
if srcCount <= capacityLeft {
if destReq == nil {
destReq = srcReq
} else {
srcReq.ld.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs())
destReq.setCachedItemsCount(srcCount)
srcReq.setCachedItemsCount(0)
}
capacityLeft -= srcCount
continue
}

for {
extractedLogs := extractLogs(srcReq.ld, capacityLeft)
if extractedLogs.LogRecordCount() == 0 {
extractedCount := extractedLogs.LogRecordCount()
if extractedCount == 0 {
break
}
capacityLeft -= extractedLogs.LogRecordCount()

if destReq == nil {
destReq = newLogsRequest(extractedLogs, srcReq.pusher).(*logsRequest)
destReq = &logsRequest{ld: extractedLogs, pusher: srcReq.pusher, cachedItemsCount: extractedCount}
} else {
extractedLogs.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs())
destReq.setCachedItemsCount(destReq.ItemsCount() + extractedCount)
srcReq.setCachedItemsCount(srcReq.ItemsCount() - extractedCount)
}

// Create new batch once capacity is reached.
capacityLeft -= extractedCount
if capacityLeft == 0 {
res = append(res, destReq)
destReq = nil
Expand Down
42 changes: 41 additions & 1 deletion exporter/exporterhelper/logs_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestMergeSplitLogs(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, len(tt.expected), len(res))
for i, r := range res {
assert.Equal(t, tt.expected[i], r.(*logsRequest))
assert.Equal(t, tt.expected[i].ld, r.(*logsRequest).ld)
}
})
}
Expand Down Expand Up @@ -152,3 +152,43 @@ func TestExtractLogs(t *testing.T) {
assert.Equal(t, 10-i, ld.LogRecordCount())
}
}

func BenchmarkSplittingBasedOnItemCountManySmallLogs(b *testing.B) {
// All requests merge into a single batch.
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000}
for i := 0; i < b.N; i++ {
merged := []Request{&logsRequest{ld: testdata.GenerateLogs(10)}}
for j := 0; j < 1000; j++ {
lr2 := &logsRequest{ld: testdata.GenerateLogs(10)}
res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2)
merged = append(merged[0:len(merged)-1], res...)
}
assert.Len(b, merged, 1)
}
}

func BenchmarkSplittingBasedOnItemCountManyLogsSlightlyAboveLimit(b *testing.B) {
// Every incoming request results in a split.
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000}
for i := 0; i < b.N; i++ {
merged := []Request{&logsRequest{ld: testdata.GenerateLogs(0)}}
for j := 0; j < 10; j++ {
lr2 := &logsRequest{ld: testdata.GenerateLogs(10001)}
res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2)
merged = append(merged[0:len(merged)-1], res...)
}
assert.Len(b, merged, 11)
}
}

func BenchmarkSplittingBasedOnItemCountHugeLogs(b *testing.B) {
// One request splits into many batches.
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000}
for i := 0; i < b.N; i++ {
merged := []Request{&logsRequest{ld: testdata.GenerateLogs(0)}}
lr2 := &logsRequest{ld: testdata.GenerateLogs(100000)}
res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2)
merged = append(merged[0:len(merged)-1], res...)
assert.Len(b, merged, 10)
}
}
19 changes: 14 additions & 5 deletions exporter/exporterhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ var (
)

type metricsRequest struct {
md pmetric.Metrics
pusher consumer.ConsumeMetricsFunc
md pmetric.Metrics
pusher consumer.ConsumeMetricsFunc
cachedItemsCount int
}

func newMetricsRequest(md pmetric.Metrics, pusher consumer.ConsumeMetricsFunc) Request {
return &metricsRequest{
md: md,
pusher: pusher,
md: md,
pusher: pusher,
cachedItemsCount: -1,
}
}

Expand Down Expand Up @@ -63,7 +65,14 @@ func (req *metricsRequest) Export(ctx context.Context) error {
}

func (req *metricsRequest) ItemsCount() int {
return req.md.DataPointCount()
if req.cachedItemsCount == -1 {
req.cachedItemsCount = req.md.DataPointCount()
}
return req.cachedItemsCount
}

func (req *metricsRequest) setCachedItemsCount(count int) {
req.cachedItemsCount = count
}

type metricsExporter struct {
Expand Down
15 changes: 11 additions & 4 deletions exporter/exporterhelper/metrics_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ func (req *metricsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.Max
continue
}

srcCount := srcReq.md.DataPointCount()
srcCount := srcReq.ItemsCount()
if srcCount <= capacityLeft {
if destReq == nil {
destReq = srcReq
} else {
destReq.setCachedItemsCount(srcCount)
srcReq.setCachedItemsCount(0)
srcReq.md.ResourceMetrics().MoveAndAppendTo(destReq.md.ResourceMetrics())
}
capacityLeft -= srcCount
Expand All @@ -51,16 +53,21 @@ func (req *metricsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.Max

for {
extractedMetrics := extractMetrics(srcReq.md, capacityLeft)
if extractedMetrics.DataPointCount() == 0 {
extractedCount := extractedMetrics.DataPointCount()
if extractedCount == 0 {
break
}
capacityLeft -= extractedMetrics.DataPointCount()

if destReq == nil {
destReq = newMetricsRequest(extractedMetrics, srcReq.pusher).(*metricsRequest)
destReq = &metricsRequest{md: extractedMetrics, pusher: srcReq.pusher, cachedItemsCount: extractedCount}
} else {
destReq.setCachedItemsCount(destReq.ItemsCount() + extractedCount)
srcReq.setCachedItemsCount(srcReq.ItemsCount() - extractedCount)
extractedMetrics.ResourceMetrics().MoveAndAppendTo(destReq.md.ResourceMetrics())
}

// Create new batch once capacity is reached.
capacityLeft -= extractedCount
if capacityLeft == 0 {
res = append(res, destReq)
destReq = nil
Expand Down
42 changes: 41 additions & 1 deletion exporter/exporterhelper/metrics_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestMergeSplitMetrics(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, len(tt.expected), len(res))
for i := range res {
assert.Equal(t, tt.expected[i], res[i].(*metricsRequest))
assert.Equal(t, tt.expected[i].md, res[i].(*metricsRequest).md)
}
})
}
Expand Down Expand Up @@ -159,3 +159,43 @@ func TestExtractMetricsInvalidMetric(t *testing.T) {
assert.Equal(t, testdata.GenerateMetricsMetricTypeInvalid(), extractedMetrics)
assert.Equal(t, 0, md.ResourceMetrics().Len())
}

func BenchmarkSplittingBasedOnItemCountManySmallMetrics(b *testing.B) {
// All requests merge into a single batch.
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 20000}
for i := 0; i < b.N; i++ {
merged := []Request{&metricsRequest{md: testdata.GenerateMetrics(10)}}
for j := 0; j < 1000; j++ {
lr2 := &metricsRequest{md: testdata.GenerateMetrics(10)}
res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2)
merged = append(merged[0:len(merged)-1], res...)
}
assert.Len(b, merged, 1)
}
}

func BenchmarkSplittingBasedOnItemCountManyMetricsSlightlyAboveLimit(b *testing.B) {
// Every incoming request results in a split.
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 20000}
for i := 0; i < b.N; i++ {
merged := []Request{&metricsRequest{md: testdata.GenerateMetrics(0)}}
for j := 0; j < 10; j++ {
lr2 := &metricsRequest{md: testdata.GenerateMetrics(10001)}
res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2)
merged = append(merged[0:len(merged)-1], res...)
}
assert.Len(b, merged, 11)
}
}

func BenchmarkSplittingBasedOnItemCountHugeMetrics(b *testing.B) {
// One request splits into many batches.
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 20000}
for i := 0; i < b.N; i++ {
merged := []Request{&metricsRequest{md: testdata.GenerateMetrics(0)}}
lr2 := &metricsRequest{md: testdata.GenerateMetrics(100000)}
res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2)
merged = append(merged[0:len(merged)-1], res...)
assert.Len(b, merged, 10)
}
}
19 changes: 14 additions & 5 deletions exporter/exporterhelper/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ var (
)

type tracesRequest struct {
td ptrace.Traces
pusher consumer.ConsumeTracesFunc
td ptrace.Traces
pusher consumer.ConsumeTracesFunc
cachedItemsCount int
}

func newTracesRequest(td ptrace.Traces, pusher consumer.ConsumeTracesFunc) Request {
return &tracesRequest{
td: td,
pusher: pusher,
td: td,
pusher: pusher,
cachedItemsCount: -1,
}
}

Expand Down Expand Up @@ -63,7 +65,14 @@ func (req *tracesRequest) Export(ctx context.Context) error {
}

func (req *tracesRequest) ItemsCount() int {
return req.td.SpanCount()
if req.cachedItemsCount == -1 {
req.cachedItemsCount = req.td.SpanCount()
}
return req.cachedItemsCount
}

func (req *tracesRequest) setCachedItemsCount(count int) {
req.cachedItemsCount = count
}

type tracesExporter struct {
Expand Down
15 changes: 11 additions & 4 deletions exporter/exporterhelper/traces_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ func (req *tracesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxS
continue
}

srcCount := srcReq.td.SpanCount()
srcCount := srcReq.ItemsCount()
if srcCount <= capacityLeft {
if destReq == nil {
destReq = srcReq
} else {
destReq.setCachedItemsCount(srcCount)
srcReq.setCachedItemsCount(0)
srcReq.td.ResourceSpans().MoveAndAppendTo(destReq.td.ResourceSpans())
}
capacityLeft -= srcCount
Expand All @@ -51,16 +53,21 @@ func (req *tracesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxS

for {
extractedTraces := extractTraces(srcReq.td, capacityLeft)
if extractedTraces.SpanCount() == 0 {
extractedCount := extractedTraces.SpanCount()
if extractedCount == 0 {
break
}
capacityLeft -= extractedTraces.SpanCount()

if destReq == nil {
destReq = newTracesRequest(extractedTraces, srcReq.pusher).(*tracesRequest)
destReq = &tracesRequest{td: extractedTraces, pusher: srcReq.pusher, cachedItemsCount: extractedCount}
} else {
destReq.setCachedItemsCount(destReq.ItemsCount() + extractedCount)
srcReq.setCachedItemsCount(srcReq.ItemsCount() - extractedCount)
extractedTraces.ResourceSpans().MoveAndAppendTo(destReq.td.ResourceSpans())
}

// Create new batch once capacity is reached.
capacityLeft -= extractedCount
if capacityLeft == 0 {
res = append(res, destReq)
destReq = nil
Expand Down
42 changes: 41 additions & 1 deletion exporter/exporterhelper/traces_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func TestMergeSplitTraces(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, len(tt.expected), len(res))
for i := range res {
assert.Equal(t, tt.expected[i], res[i].(*tracesRequest))
assert.Equal(t, tt.expected[i].td, res[i].(*tracesRequest).td)
}
})
}
Expand Down Expand Up @@ -159,3 +159,43 @@ func TestExtractTraces(t *testing.T) {
assert.Equal(t, 10-i, td.SpanCount())
}
}

func BenchmarkSplittingBasedOnItemCountManySmallTraces(b *testing.B) {
// All requests merge into a single batch.
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000}
for i := 0; i < b.N; i++ {
merged := []Request{&tracesRequest{td: testdata.GenerateTraces(10)}}
for j := 0; j < 1000; j++ {
lr2 := &tracesRequest{td: testdata.GenerateTraces(10)}
res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2)
merged = append(merged[0:len(merged)-1], res...)
}
assert.Len(b, merged, 1)
}
}

func BenchmarkSplittingBasedOnItemCountManyTracesSlightlyAboveLimit(b *testing.B) {
// Every incoming request results in a split.
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000}
for i := 0; i < b.N; i++ {
merged := []Request{&tracesRequest{td: testdata.GenerateTraces(0)}}
for j := 0; j < 10; j++ {
lr2 := &tracesRequest{td: testdata.GenerateTraces(10001)}
res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2)
merged = append(merged[0:len(merged)-1], res...)
}
assert.Len(b, merged, 11)
}
}

func BenchmarkSplittingBasedOnItemCountHugeTraces(b *testing.B) {
// One request splits into many batches.
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000}
for i := 0; i < b.N; i++ {
merged := []Request{&tracesRequest{td: testdata.GenerateTraces(0)}}
lr2 := &tracesRequest{td: testdata.GenerateTraces(100000)}
res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2)
merged = append(merged[0:len(merged)-1], res...)
assert.Len(b, merged, 10)
}
}
Loading