Skip to content

Commit

Permalink
Edits according to feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Oct 18, 2024
1 parent 16feb32 commit 075207b
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/pdata/pprofile"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/pdata/testdata"
)

Expand All @@ -30,7 +28,7 @@ func TestMergeProfiles(t *testing.T) {
}

func TestMergeProfilesInvalidInput(t *testing.T) {
pr1 := &tracesRequest{td: testdata.GenerateTraces(2)}
pr1 := &dummyRequest{}
pr2 := &profilesRequest{pd: testdata.GenerateProfiles(3)}
_, err := pr2.Merge(context.Background(), pr1)
assert.Error(t, err)
Expand Down Expand Up @@ -131,7 +129,7 @@ func TestMergeSplitProfiles(t *testing.T) {
}

func TestMergeSplitProfilesInvalidInput(t *testing.T) {
r1 := &tracesRequest{td: testdata.GenerateTraces(2)}
r1 := &dummyRequest{}
r2 := &profilesRequest{pd: testdata.GenerateProfiles(3)}
_, err := r2.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, r1)
assert.Error(t, err)
Expand All @@ -146,25 +144,23 @@ func TestExtractProfiles(t *testing.T) {
}
}

type tracesRequest struct {
td ptrace.Traces
pusher consumer.ConsumeTracesFunc
// dummyRequest implements BatchRequest. It is for checking that merging two request types would fail
type dummyRequest struct {
}

func (req *tracesRequest) Export(ctx context.Context) error {
return req.pusher(ctx, req.td)
func (req *dummyRequest) Export(_ context.Context) error {
return nil
}

func (req *tracesRequest) ItemsCount() int {
return req.td.SpanCount()
func (req *dummyRequest) ItemsCount() int {
return 1
}

func (req *tracesRequest) Merge(_ context.Context, _ exporterhelper.BatchRequest) (exporterhelper.BatchRequest, error) {
func (req *dummyRequest) Merge(_ context.Context, _ exporterhelper.BatchRequest) (exporterhelper.BatchRequest, error) {
return nil, nil
}

// MergeSplit splits and/or merges the profiles into multiple requests based on the MaxSizeConfig.
func (req *tracesRequest) MergeSplit(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ exporterhelper.BatchRequest) (
func (req *dummyRequest) MergeSplit(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ exporterhelper.BatchRequest) (
[]exporterhelper.BatchRequest, error) {
return nil, nil
}
10 changes: 8 additions & 2 deletions exporter/exporterhelper/internal/batch_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe

import (
"context"
"errors"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -141,10 +142,15 @@ func (bs *BatchSender) Send(ctx context.Context, req internal.Request) error {
return bs.NextSender.Send(ctx, req)
}

batchReq, ok := req.(internal.BatchRequest)
if !ok {
errors.New("Incoming request does not implement BatchRequest interface.")
}

Check warning on line 148 in exporter/exporterhelper/internal/batch_sender.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/batch_sender.go#L147-L148

Added lines #L147 - L148 were not covered by tests

if bs.cfg.MaxSizeItems > 0 {
return bs.sendMergeSplitBatch(ctx, req.(internal.BatchRequest))
return bs.sendMergeSplitBatch(ctx, batchReq)
}
return bs.sendMergeBatch(ctx, req.(internal.BatchRequest))
return bs.sendMergeBatch(ctx, batchReq)
}

// sendMergeSplitBatch sends the request to the batch which may be split into multiple requests.
Expand Down
13 changes: 13 additions & 0 deletions exporter/internal/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,20 @@ type Request interface {
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type BatchRequest interface {
Request
// Merge() is a function that merges this request with another one into a single request.
// Do not mutate the requests passed to the function if error can be returned after mutation or if the exporter is
// marked as not mutable.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
Merge(context.Context, BatchRequest) (BatchRequest, error)
// MergeSplit() is a function that merge and/or splits this request with another one into multiple requests based on the
// configured limit provided in MaxSizeConfig.
// All the returned requests MUST have a number of items that does not exceed the maximum number of items.
// Size of the last returned request MUST be less or equal than the size of any other returned request.
// The original request MUST not be mutated if error is returned after mutation or if the exporter is
// marked as not mutable. The length of the returned slice MUST not be 0.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, BatchRequest) ([]BatchRequest, error)
}

Expand Down

0 comments on commit 075207b

Please sign in to comment.