From 869dad4022c7ebcc50247d6967c3de12ddf79ce4 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Thu, 4 Aug 2022 12:33:07 -0700 Subject: [PATCH] Remove unnecessary internal package, used only in one place (#5820) Signed-off-by: Bogdan --- .../err_or_sink_consumer.go | 81 ------------------- receiver/otlpreceiver/otlp_test.go | 69 ++++++++++++++-- 2 files changed, 63 insertions(+), 87 deletions(-) delete mode 100644 internal/internalconsumertest/err_or_sink_consumer.go diff --git a/internal/internalconsumertest/err_or_sink_consumer.go b/internal/internalconsumertest/err_or_sink_consumer.go deleted file mode 100644 index 6936e07246d..00000000000 --- a/internal/internalconsumertest/err_or_sink_consumer.go +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package internalconsumertest // import "go.opentelemetry.io/collector/internal/internalconsumertest" - -import ( - "context" - "sync" - - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumertest" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/pdata/ptrace" -) - -type ErrOrSinkConsumer struct { - *consumertest.TracesSink - *consumertest.MetricsSink - mu sync.Mutex - consumeError error // to be returned by ConsumeTraces, if set -} - -// SetConsumeError sets an error that will be returned by the Consume function. -func (esc *ErrOrSinkConsumer) SetConsumeError(err error) { - esc.mu.Lock() - defer esc.mu.Unlock() - esc.consumeError = err -} - -func (esc *ErrOrSinkConsumer) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: false} -} - -// ConsumeTraces stores traces to this sink. -func (esc *ErrOrSinkConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { - esc.mu.Lock() - defer esc.mu.Unlock() - - if esc.consumeError != nil { - return esc.consumeError - } - - return esc.TracesSink.ConsumeTraces(ctx, td) -} - -// ConsumeMetrics stores metrics to this sink. -func (esc *ErrOrSinkConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { - esc.mu.Lock() - defer esc.mu.Unlock() - - if esc.consumeError != nil { - return esc.consumeError - } - - return esc.MetricsSink.ConsumeMetrics(ctx, md) -} - -// Reset deletes any stored in the sinks, resets error to nil. -func (esc *ErrOrSinkConsumer) Reset() { - esc.mu.Lock() - defer esc.mu.Unlock() - - esc.consumeError = nil - if esc.TracesSink != nil { - esc.TracesSink.Reset() - } - if esc.MetricsSink != nil { - esc.MetricsSink.Reset() - } -} diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index b5fdede1048..908045fad94 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -24,6 +24,7 @@ import ( "io/ioutil" "net" "net/http" + "sync" "testing" "time" @@ -45,11 +46,11 @@ import ( "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" - "go.opentelemetry.io/collector/internal/internalconsumertest" "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/internal/testutil" "go.opentelemetry.io/collector/obsreport/obsreporttest" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" semconv "go.opentelemetry.io/collector/semconv/v1.5.0" @@ -161,7 +162,7 @@ func TestJsonHttp(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) // Set the buffer count to 1 to make it flush the test span immediately. - sink := &internalconsumertest.ErrOrSinkConsumer{TracesSink: new(consumertest.TracesSink)} + sink := &errOrSinkConsumer{TracesSink: new(consumertest.TracesSink)} ocr := newHTTPReceiver(t, addr, sink, nil) require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver") @@ -333,7 +334,7 @@ func TestHandleInvalidRequests(t *testing.T) { require.NoError(t, err) } -func testHTTPJSONRequest(t *testing.T, url string, sink *internalconsumertest.ErrOrSinkConsumer, encoding string, expectedErr error) { +func testHTTPJSONRequest(t *testing.T, url string, sink *errOrSinkConsumer, encoding string, expectedErr error) { var buf *bytes.Buffer var err error switch encoding { @@ -412,7 +413,7 @@ func TestProtoHttp(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) // Set the buffer count to 1 to make it flush the test span immediately. - tSink := &internalconsumertest.ErrOrSinkConsumer{TracesSink: new(consumertest.TracesSink)} + tSink := &errOrSinkConsumer{TracesSink: new(consumertest.TracesSink)} ocr := newHTTPReceiver(t, addr, tSink, consumertest.NewNop()) require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver") @@ -462,7 +463,7 @@ func createHTTPProtobufRequest( func testHTTPProtobufRequest( t *testing.T, url string, - tSink *internalconsumertest.ErrOrSinkConsumer, + tSink *errOrSinkConsumer, encoding string, traceBytes []byte, expectedErr error, @@ -665,7 +666,7 @@ func TestOTLPReceiverTrace_HandleNextConsumerResponse(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - sink := &internalconsumertest.ErrOrSinkConsumer{TracesSink: new(consumertest.TracesSink)} + sink := &errOrSinkConsumer{TracesSink: new(consumertest.TracesSink)} ocr := newGRPCReceiver(t, exporter.receiverTag, addr, sink, nil) require.NotNil(t, ocr) @@ -996,3 +997,59 @@ func exportTraces(cc *grpc.ClientConn, td ptrace.Traces) error { return err } + +type errOrSinkConsumer struct { + *consumertest.TracesSink + *consumertest.MetricsSink + mu sync.Mutex + consumeError error // to be returned by ConsumeTraces, if set +} + +// SetConsumeError sets an error that will be returned by the Consume function. +func (esc *errOrSinkConsumer) SetConsumeError(err error) { + esc.mu.Lock() + defer esc.mu.Unlock() + esc.consumeError = err +} + +func (esc *errOrSinkConsumer) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +// ConsumeTraces stores traces to this sink. +func (esc *errOrSinkConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + esc.mu.Lock() + defer esc.mu.Unlock() + + if esc.consumeError != nil { + return esc.consumeError + } + + return esc.TracesSink.ConsumeTraces(ctx, td) +} + +// ConsumeMetrics stores metrics to this sink. +func (esc *errOrSinkConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + esc.mu.Lock() + defer esc.mu.Unlock() + + if esc.consumeError != nil { + return esc.consumeError + } + + return esc.MetricsSink.ConsumeMetrics(ctx, md) +} + +// Reset deletes any stored in the sinks, resets error to nil. +func (esc *errOrSinkConsumer) Reset() { + esc.mu.Lock() + defer esc.mu.Unlock() + + esc.consumeError = nil + if esc.TracesSink != nil { + esc.TracesSink.Reset() + } + if esc.MetricsSink != nil { + esc.MetricsSink.Reset() + } +}