Skip to content

Commit

Permalink
Merge branch 'main' into rmtranslation
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Boten authored Aug 4, 2022
2 parents 437354c + 869dad4 commit 249d3ce
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 87 deletions.
81 changes: 0 additions & 81 deletions internal/internalconsumertest/err_or_sink_consumer.go

This file was deleted.

69 changes: 63 additions & 6 deletions receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io/ioutil"
"net"
"net/http"
"sync"
"testing"
"time"

Expand All @@ -45,11 +46,11 @@ import (
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/internalconsumertest"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/internal/testutil"
"go.opentelemetry.io/collector/obsreport/obsreporttest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
semconv "go.opentelemetry.io/collector/semconv/v1.5.0"
Expand Down Expand Up @@ -161,7 +162,7 @@ func TestJsonHttp(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)

// Set the buffer count to 1 to make it flush the test span immediately.
sink := &internalconsumertest.ErrOrSinkConsumer{TracesSink: new(consumertest.TracesSink)}
sink := &errOrSinkConsumer{TracesSink: new(consumertest.TracesSink)}
ocr := newHTTPReceiver(t, addr, sink, nil)

require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver")
Expand Down Expand Up @@ -333,7 +334,7 @@ func TestHandleInvalidRequests(t *testing.T) {
require.NoError(t, err)
}

func testHTTPJSONRequest(t *testing.T, url string, sink *internalconsumertest.ErrOrSinkConsumer, encoding string, expectedErr error) {
func testHTTPJSONRequest(t *testing.T, url string, sink *errOrSinkConsumer, encoding string, expectedErr error) {
var buf *bytes.Buffer
var err error
switch encoding {
Expand Down Expand Up @@ -412,7 +413,7 @@ func TestProtoHttp(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)

// Set the buffer count to 1 to make it flush the test span immediately.
tSink := &internalconsumertest.ErrOrSinkConsumer{TracesSink: new(consumertest.TracesSink)}
tSink := &errOrSinkConsumer{TracesSink: new(consumertest.TracesSink)}
ocr := newHTTPReceiver(t, addr, tSink, consumertest.NewNop())

require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver")
Expand Down Expand Up @@ -462,7 +463,7 @@ func createHTTPProtobufRequest(
func testHTTPProtobufRequest(
t *testing.T,
url string,
tSink *internalconsumertest.ErrOrSinkConsumer,
tSink *errOrSinkConsumer,
encoding string,
traceBytes []byte,
expectedErr error,
Expand Down Expand Up @@ -665,7 +666,7 @@ func TestOTLPReceiverTrace_HandleNextConsumerResponse(t *testing.T) {
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

sink := &internalconsumertest.ErrOrSinkConsumer{TracesSink: new(consumertest.TracesSink)}
sink := &errOrSinkConsumer{TracesSink: new(consumertest.TracesSink)}

ocr := newGRPCReceiver(t, exporter.receiverTag, addr, sink, nil)
require.NotNil(t, ocr)
Expand Down Expand Up @@ -996,3 +997,59 @@ func exportTraces(cc *grpc.ClientConn, td ptrace.Traces) error {

return err
}

type errOrSinkConsumer struct {
*consumertest.TracesSink
*consumertest.MetricsSink
mu sync.Mutex
consumeError error // to be returned by ConsumeTraces, if set
}

// SetConsumeError sets an error that will be returned by the Consume function.
func (esc *errOrSinkConsumer) SetConsumeError(err error) {
esc.mu.Lock()
defer esc.mu.Unlock()
esc.consumeError = err
}

func (esc *errOrSinkConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

// ConsumeTraces stores traces to this sink.
func (esc *errOrSinkConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
esc.mu.Lock()
defer esc.mu.Unlock()

if esc.consumeError != nil {
return esc.consumeError
}

return esc.TracesSink.ConsumeTraces(ctx, td)
}

// ConsumeMetrics stores metrics to this sink.
func (esc *errOrSinkConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
esc.mu.Lock()
defer esc.mu.Unlock()

if esc.consumeError != nil {
return esc.consumeError
}

return esc.MetricsSink.ConsumeMetrics(ctx, md)
}

// Reset deletes any stored in the sinks, resets error to nil.
func (esc *errOrSinkConsumer) Reset() {
esc.mu.Lock()
defer esc.mu.Unlock()

esc.consumeError = nil
if esc.TracesSink != nil {
esc.TracesSink.Reset()
}
if esc.MetricsSink != nil {
esc.MetricsSink.Reset()
}
}

0 comments on commit 249d3ce

Please sign in to comment.