Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][exporter/elasticsearch] Add benchmark for logs consumer #33035

Merged
merged 6 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions exporter/elasticsearchexporter/integrationtest/datareceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,18 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
)

var emptyLogs plog.Logs
lahsivjar marked this conversation as resolved.
Show resolved Hide resolved

type esDataReceiver struct {
testbed.DataReceiverBase
receiver receiver.Logs
endpoint string
Endpoint string
lahsivjar marked this conversation as resolved.
Show resolved Hide resolved
}

func newElasticsearchDataReceiver(t testing.TB) testbed.DataReceiver {
func newElasticsearchDataReceiver(t testing.TB) *esDataReceiver {
return &esDataReceiver{
DataReceiverBase: testbed.DataReceiverBase{},
endpoint: fmt.Sprintf("http://%s:%d", testbed.DefaultHost, testutil.GetAvailablePort(t)),
Endpoint: fmt.Sprintf("http://%s:%d", testbed.DefaultHost, testutil.GetAvailablePort(t)),
}
}

Expand All @@ -47,7 +49,7 @@ func (es *esDataReceiver) Start(_ consumer.Traces, _ consumer.Metrics, lc consum
receiver.WithLogs(createLogsReceiver, component.StabilityLevelDevelopment),
)
cfg := factory.CreateDefaultConfig().(*config)
cfg.ESEndpoint = es.endpoint
cfg.ESEndpoint = es.Endpoint

var err error
set := receivertest.NewNopCreateSettings()
Expand Down Expand Up @@ -80,7 +82,7 @@ func (es *esDataReceiver) GenConfigYAMLStr() string {
enabled: true
max_requests: 10000
`
return fmt.Sprintf(cfgFormat, es.endpoint)
return fmt.Sprintf(cfgFormat, es.Endpoint)
}

func (es *esDataReceiver) ProtocolName() string {
Expand Down Expand Up @@ -129,13 +131,10 @@ func newMockESReceiver(params receiver.CreateSettings, cfg *config, next consume
for k, item := range itemMap {
// Ideally bulk request should be converted to log record
// however, since we only assert count for now there is no
// need to do the actual translation.
logs := plog.NewLogs()
logs.ResourceLogs().AppendEmpty().
ScopeLogs().AppendEmpty().
LogRecords().AppendEmpty()

if err := next.ConsumeLogs(context.Background(), logs); err != nil {
// need to do the actual translation. We use a pre-initialized
// empty plog.Logs to reduce allocation impact on tests and
// benchmarks due to this.
if err := next.ConsumeLogs(context.Background(), emptyLogs); err != nil {
response.HasErrors = true
item.Status = http.StatusTooManyRequests
item.Error.Type = "simulated_es_error"
Expand Down Expand Up @@ -175,3 +174,10 @@ func (es *mockESReceiver) Start(_ context.Context, _ component.Host) error {
func (es *mockESReceiver) Shutdown(ctx context.Context) error {
return es.server.Shutdown(ctx)
}

func init() {
emptyLogs = plog.NewLogs()
emptyLogs.ResourceLogs().AppendEmpty().
ScopeLogs().AppendEmpty().
LogRecords().AppendEmpty()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package integrationtest

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/plog"
)

func BenchmarkLogsExporter(b *testing.B) {
for _, tc := range []struct {
name string
batchSize int
}{
{name: "small_batch", batchSize: 10},
{name: "medium_batch", batchSize: 100},
{name: "large_batch", batchSize: 1000},
{name: "xlarge_batch", batchSize: 10000},
} {
b.Run(tc.name, func(b *testing.B) {
benchmarkLogs(b, tc.batchSize)
})
}
}

func benchmarkLogs(b *testing.B, batchSize int) {
var generatedCount, observedCount atomic.Uint64

receiver := newElasticsearchDataReceiver(b)
factory := elasticsearchexporter.NewFactory()

cfg := factory.CreateDefaultConfig().(*elasticsearchexporter.Config)
cfg.Endpoints = []string{receiver.Endpoint}
cfg.Flush.Interval = 10 * time.Millisecond
cfg.NumWorkers = 1

exporter, err := factory.CreateLogsExporter(
context.Background(),
exportertest.NewNopCreateSettings(),
cfg,
)
require.NoError(b, err)

provider := testbed.NewPerfTestDataProvider(testbed.LoadOptions{ItemsPerBatch: batchSize})
provider.SetLoadGeneratorCounters(&generatedCount)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

logsConsumer, err := consumer.NewLogs(func(_ context.Context, ld plog.Logs) error {
observedCount.Add(1)
return nil
})
require.NoError(b, err)

require.NoError(b, receiver.Start(nil, nil, logsConsumer))
defer receiver.Stop()

b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
logs, _ := provider.GenerateLogs()
b.StartTimer()
exporter.ConsumeLogs(ctx, logs)
}
require.NoError(b, exporter.Shutdown(ctx))
require.Equal(b, generatedCount.Load(), observedCount.Load(), "failed to send all logs to backend")
}
Loading