diff --git a/consumer/consumertest/sink.go b/consumer/consumertest/sink.go new file mode 100644 index 00000000000..d585d5bafab --- /dev/null +++ b/consumer/consumertest/sink.go @@ -0,0 +1,183 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumertest + +import ( + "context" + "sync" + + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/pdata" +) + +type baseErrorConsumer struct { + mu sync.Mutex + consumeError error // to be returned by ConsumeTraces, if set +} + +// SetConsumeError sets an error that will be returned by the Consume function. +func (bec *baseErrorConsumer) SetConsumeError(err error) { + bec.mu.Lock() + defer bec.mu.Unlock() + bec.consumeError = err +} + +// SinkTraces acts as a trace receiver for use in tests. +type SinkTraces struct { + baseErrorConsumer + traces []pdata.Traces + spansCount int +} + +var _ consumer.TraceConsumer = (*SinkTraces)(nil) + +// ConsumeTraceData stores traces for tests. +func (ste *SinkTraces) ConsumeTraces(_ context.Context, td pdata.Traces) error { + ste.mu.Lock() + defer ste.mu.Unlock() + + if ste.consumeError != nil { + return ste.consumeError + } + + ste.traces = append(ste.traces, td) + ste.spansCount += td.SpanCount() + + return nil +} + +// AllTraces returns the traces sent to the test sink. +func (ste *SinkTraces) AllTraces() []pdata.Traces { + ste.mu.Lock() + defer ste.mu.Unlock() + + copyTraces := make([]pdata.Traces, len(ste.traces)) + copy(copyTraces, ste.traces) + return copyTraces +} + +// SpansCount return the number of spans sent to the test sing. +func (ste *SinkTraces) SpansCount() int { + ste.mu.Lock() + defer ste.mu.Unlock() + return ste.spansCount +} + +// Reset deletes any existing metrics. +func (ste *SinkTraces) Reset() { + ste.mu.Lock() + defer ste.mu.Unlock() + + ste.traces = nil + ste.spansCount = 0 +} + +// SinkMetrics acts as a metrics receiver for use in tests. +type SinkMetrics struct { + baseErrorConsumer + metrics []pdata.Metrics + metricsCount int +} + +var _ consumer.MetricsConsumer = (*SinkMetrics)(nil) + +// ConsumeMetricsData stores traces for tests. +func (sme *SinkMetrics) ConsumeMetrics(_ context.Context, md pdata.Metrics) error { + sme.mu.Lock() + defer sme.mu.Unlock() + if sme.consumeError != nil { + return sme.consumeError + } + + sme.metrics = append(sme.metrics, md) + sme.metricsCount += md.MetricCount() + + return nil +} + +// AllMetrics returns the metrics sent to the test sink. +func (sme *SinkMetrics) AllMetrics() []pdata.Metrics { + sme.mu.Lock() + defer sme.mu.Unlock() + + copyMetrics := make([]pdata.Metrics, len(sme.metrics)) + copy(copyMetrics, sme.metrics) + return copyMetrics +} + +// MetricsCount return the number of metrics sent to the test sing. +func (sme *SinkMetrics) MetricsCount() int { + sme.mu.Lock() + defer sme.mu.Unlock() + return sme.metricsCount +} + +// Reset deletes any existing metrics. +func (sme *SinkMetrics) Reset() { + sme.mu.Lock() + defer sme.mu.Unlock() + + sme.metrics = nil + sme.metricsCount = 0 +} + +// SinkLogs acts as a metrics receiver for use in tests. +type SinkLogs struct { + baseErrorConsumer + logs []pdata.Logs + logRecordsCount int +} + +var _ consumer.LogsConsumer = (*SinkLogs)(nil) + +// ConsumeLogData stores traces for tests. +func (sle *SinkLogs) ConsumeLogs(_ context.Context, ld pdata.Logs) error { + sle.mu.Lock() + defer sle.mu.Unlock() + if sle.consumeError != nil { + return sle.consumeError + } + + sle.logs = append(sle.logs, ld) + sle.logRecordsCount += ld.LogRecordCount() + + return nil +} + +// AllLog returns the metrics sent to the test sink. +func (sle *SinkLogs) AllLogs() []pdata.Logs { + sle.mu.Lock() + defer sle.mu.Unlock() + + copyLogs := make([]pdata.Logs, len(sle.logs)) + copy(copyLogs, sle.logs) + return copyLogs +} + +// LogRecordsCount return the number of log records sent to the test sing. +func (sle *SinkLogs) LogRecordsCount() int { + sle.mu.Lock() + defer sle.mu.Unlock() + return sle.logRecordsCount +} + +// Reset deletes any existing logs. +func (sle *SinkLogs) Reset() { + sle.mu.Lock() + defer sle.mu.Unlock() + + sle.logs = nil + sle.logRecordsCount = 0 +} diff --git a/consumer/consumertest/sink_test.go b/consumer/consumertest/sink_test.go new file mode 100644 index 00000000000..371740a2ec8 --- /dev/null +++ b/consumer/consumertest/sink_test.go @@ -0,0 +1,99 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumertest + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/internal/data/testdata" +) + +func TestSinkTraces(t *testing.T) { + sink := new(SinkTraces) + td := testdata.GenerateTraceDataOneSpan() + want := make([]pdata.Traces, 0, 7) + for i := 0; i < 7; i++ { + require.NoError(t, sink.ConsumeTraces(context.Background(), td)) + want = append(want, td) + } + assert.Equal(t, want, sink.AllTraces()) + assert.Equal(t, len(want), sink.SpansCount()) + sink.Reset() + assert.Equal(t, 0, len(sink.AllTraces())) + assert.Equal(t, 0, sink.SpansCount()) +} + +func TestSinkTraces_Error(t *testing.T) { + sink := new(SinkTraces) + sink.SetConsumeError(errors.New("my error")) + td := testdata.GenerateTraceDataOneSpan() + require.Error(t, sink.ConsumeTraces(context.Background(), td)) + assert.Len(t, sink.AllTraces(), 0) + assert.Equal(t, 0, sink.SpansCount()) +} + +func TestSinkMetrics(t *testing.T) { + sink := new(SinkMetrics) + md := testdata.GenerateMetricsOneMetric() + want := make([]pdata.Metrics, 0, 7) + for i := 0; i < 7; i++ { + require.NoError(t, sink.ConsumeMetrics(context.Background(), md)) + want = append(want, md) + } + assert.Equal(t, want, sink.AllMetrics()) + assert.Equal(t, len(want), sink.MetricsCount()) + sink.Reset() + assert.Equal(t, 0, len(sink.AllMetrics())) + assert.Equal(t, 0, sink.MetricsCount()) +} + +func TestSinkMetrics_Error(t *testing.T) { + sink := new(SinkMetrics) + sink.SetConsumeError(errors.New("my error")) + md := testdata.GenerateMetricsOneMetric() + require.Error(t, sink.ConsumeMetrics(context.Background(), md)) + assert.Len(t, sink.AllMetrics(), 0) + assert.Equal(t, 0, sink.MetricsCount()) +} + +func TestSinkLogs(t *testing.T) { + sink := new(SinkLogs) + md := testdata.GenerateLogDataOneLogNoResource() + want := make([]pdata.Logs, 0, 7) + for i := 0; i < 7; i++ { + require.NoError(t, sink.ConsumeLogs(context.Background(), md)) + want = append(want, md) + } + assert.Equal(t, want, sink.AllLogs()) + assert.Equal(t, len(want), sink.LogRecordsCount()) + sink.Reset() + assert.Equal(t, 0, len(sink.AllLogs())) + assert.Equal(t, 0, sink.LogRecordsCount()) +} + +func TestSinkLogs_Error(t *testing.T) { + sink := new(SinkLogs) + sink.SetConsumeError(errors.New("my error")) + ld := testdata.GenerateLogDataOneLogNoResource() + require.Error(t, sink.ConsumeLogs(context.Background(), ld)) + assert.Len(t, sink.AllLogs(), 0) + assert.Equal(t, 0, sink.LogRecordsCount()) +} diff --git a/exporter/exportertest/sink_exporter.go b/exporter/exportertest/sink_exporter.go index e30a84aff15..64550513b35 100644 --- a/exporter/exportertest/sink_exporter.go +++ b/exporter/exportertest/sink_exporter.go @@ -24,6 +24,7 @@ import ( ) // SinkTraceExporter acts as a trace receiver for use in tests. +// Deprecated: Use consumertest.SinkTraces type SinkTraceExporter struct { mu sync.Mutex consumeTraceError error // to be returned by ConsumeTraces, if set @@ -92,6 +93,7 @@ func (ste *SinkTraceExporter) Shutdown(context.Context) error { } // SinkMetricsExporter acts as a metrics receiver for use in tests. +// Deprecated: Use consumertest.SinkMetrics type SinkMetricsExporter struct { mu sync.Mutex consumeMetricsError error // to be returned by ConsumeMetrics, if set @@ -159,6 +161,7 @@ func (sme *SinkMetricsExporter) Shutdown(context.Context) error { } // SinkLogsExporter acts as a metrics receiver for use in tests. +// Deprecated: Use consumertest.SinkLogs type SinkLogsExporter struct { consumeLogError error // to be returned by ConsumeLog, if set mu sync.Mutex diff --git a/exporter/opencensusexporter/opencensus_test.go b/exporter/opencensusexporter/opencensus_test.go index cb994773612..bc5edaed422 100644 --- a/exporter/opencensusexporter/opencensus_test.go +++ b/exporter/opencensusexporter/opencensus_test.go @@ -26,15 +26,15 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configtls" + "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/internal/data/testdata" "go.opentelemetry.io/collector/receiver/opencensusreceiver" "go.opentelemetry.io/collector/testutil" ) func TestSendTraces(t *testing.T) { - sink := &exportertest.SinkTraceExporter{} + sink := new(consumertest.SinkTraces) rFactory := opencensusreceiver.NewFactory() rCfg := rFactory.CreateDefaultConfig().(*opencensusreceiver.Config) endpoint := testutil.GetAvailableLocalAddress(t) @@ -133,7 +133,7 @@ func TestSendTraces_AfterStop(t *testing.T) { } func TestSendMetrics(t *testing.T) { - sink := &exportertest.SinkMetricsExporter{} + sink := new(consumertest.SinkMetrics) rFactory := opencensusreceiver.NewFactory() rCfg := rFactory.CreateDefaultConfig().(*opencensusreceiver.Config) endpoint := testutil.GetAvailableLocalAddress(t) diff --git a/exporter/otlphttpexporter/otlp_test.go b/exporter/otlphttpexporter/otlp_test.go index 691efe04b61..a0e29491f04 100644 --- a/exporter/otlphttpexporter/otlp_test.go +++ b/exporter/otlphttpexporter/otlp_test.go @@ -30,7 +30,7 @@ import ( "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/internal/data/testdata" "go.opentelemetry.io/collector/receiver/otlpreceiver" "go.opentelemetry.io/collector/testutil" @@ -75,8 +75,8 @@ func TestTraceInvalidUrl(t *testing.T) { func TestTraceError(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) - sink := new(exportertest.SinkTraceExporter) - sink.SetConsumeTraceError(errors.New("my_error")) + sink := new(consumertest.SinkTraces) + sink.SetConsumeError(errors.New("my_error")) startTraceReceiver(t, addr, sink) exp := startTraceExporter(t, "", fmt.Sprintf("http://%s/v1/trace", addr)) @@ -113,7 +113,7 @@ func TestTraceRoundTrip(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - sink := new(exportertest.SinkTraceExporter) + sink := new(consumertest.SinkTraces) startTraceReceiver(t, addr, sink) exp := startTraceExporter(t, test.baseURL, test.overrideURL) @@ -132,8 +132,8 @@ func TestTraceRoundTrip(t *testing.T) { func TestMetricsError(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) - sink := new(exportertest.SinkMetricsExporter) - sink.SetConsumeMetricsError(errors.New("my_error")) + sink := new(consumertest.SinkMetrics) + sink.SetConsumeError(errors.New("my_error")) startMetricsReceiver(t, addr, sink) exp := startMetricsExporter(t, "", fmt.Sprintf("http://%s/v1/metrics", addr)) @@ -168,7 +168,7 @@ func TestMetricsRoundTrip(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - sink := new(exportertest.SinkMetricsExporter) + sink := new(consumertest.SinkMetrics) startMetricsReceiver(t, addr, sink) exp := startMetricsExporter(t, test.baseURL, test.overrideURL) @@ -187,8 +187,8 @@ func TestMetricsRoundTrip(t *testing.T) { func TestLogsError(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) - sink := new(exportertest.SinkLogsExporter) - sink.SetConsumeLogError(errors.New("my_error")) + sink := new(consumertest.SinkLogs) + sink.SetConsumeError(errors.New("my_error")) startLogsReceiver(t, addr, sink) exp := startLogsExporter(t, "", fmt.Sprintf("http://%s/v1/logs", addr)) @@ -223,7 +223,7 @@ func TestLogsRoundTrip(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - sink := new(exportertest.SinkLogsExporter) + sink := new(consumertest.SinkLogs) startLogsReceiver(t, addr, sink) exp := startLogsExporter(t, test.baseURL, test.overrideURL) diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index b907b05843e..9abcbb98617 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -28,13 +28,13 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/internal/data/testdata" ) func TestBatchProcessorSpansDelivered(t *testing.T) { - sink := &exportertest.SinkTraceExporter{} + sink := new(consumertest.SinkTraces) cfg := createDefaultConfig().(*Config) cfg.SendBatchSize = 128 creationParams := component.ProcessorCreateParams{Logger: zap.NewNop()} @@ -74,7 +74,7 @@ func TestBatchProcessorSpansDelivered(t *testing.T) { } func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) { - sink := &exportertest.SinkTraceExporter{} + sink := new(consumertest.SinkTraces) cfg := createDefaultConfig().(*Config) cfg.SendBatchSize = 128 cfg.SendBatchMaxSize = 128 @@ -120,7 +120,7 @@ func TestBatchProcessorSentBySize(t *testing.T) { view.Register(views...) defer view.Unregister(views...) - sink := &exportertest.SinkTraceExporter{} + sink := new(consumertest.SinkTraces) cfg := createDefaultConfig().(*Config) sendBatchSize := 20 cfg.SendBatchSize = uint32(sendBatchSize) @@ -177,7 +177,7 @@ func TestBatchProcessorSentBySize(t *testing.T) { } func TestBatchProcessorSentByTimeout(t *testing.T) { - sink := &exportertest.SinkTraceExporter{} + sink := new(consumertest.SinkTraces) cfg := createDefaultConfig().(*Config) sendBatchSize := 100 cfg.SendBatchSize = uint32(sendBatchSize) @@ -230,7 +230,7 @@ func TestBatchProcessorTraceSendWhenClosing(t *testing.T) { Timeout: 3 * time.Second, SendBatchSize: 1000, } - sink := &exportertest.SinkTraceExporter{} + sink := new(consumertest.SinkTraces) creationParams := component.ProcessorCreateParams{Logger: zap.NewNop()} batcher := newBatchTracesProcessor(creationParams, sink, &cfg, configtelemetry.LevelDetailed) @@ -259,7 +259,7 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) { requestCount := 100 metricsPerRequest := 5 - sink := &exportertest.SinkMetricsExporter{} + sink := new(consumertest.SinkMetrics) createParams := component.ProcessorCreateParams{Logger: zap.NewNop()} batcher := newBatchMetricsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed) @@ -310,7 +310,7 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) { requestCount := 100 metricsPerRequest := 5 - sink := &exportertest.SinkMetricsExporter{} + sink := new(consumertest.SinkMetrics) createParams := component.ProcessorCreateParams{Logger: zap.NewNop()} batcher := newBatchMetricsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed) @@ -365,7 +365,7 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) { } requestCount := 5 metricsPerRequest := 10 - sink := &exportertest.SinkMetricsExporter{} + sink := new(consumertest.SinkMetrics) createParams := component.ProcessorCreateParams{Logger: zap.NewNop()} batcher := newBatchMetricsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed) @@ -412,7 +412,7 @@ func TestBatchMetricProcessor_Shutdown(t *testing.T) { } requestCount := 5 metricsPerRequest := 10 - sink := &exportertest.SinkMetricsExporter{} + sink := new(consumertest.SinkMetrics) createParams := component.ProcessorCreateParams{Logger: zap.NewNop()} batcher := newBatchMetricsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed) @@ -515,7 +515,7 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) { requestCount := 100 logsPerRequest := 5 - sink := &exportertest.SinkLogsExporter{} + sink := new(consumertest.SinkLogs) createParams := component.ProcessorCreateParams{Logger: zap.NewNop()} batcher := newBatchLogsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed) @@ -566,7 +566,7 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) { requestCount := 100 logsPerRequest := 5 - sink := &exportertest.SinkLogsExporter{} + sink := new(consumertest.SinkLogs) createParams := component.ProcessorCreateParams{Logger: zap.NewNop()} batcher := newBatchLogsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed) @@ -621,7 +621,7 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) { } requestCount := 5 logsPerRequest := 10 - sink := &exportertest.SinkLogsExporter{} + sink := new(consumertest.SinkLogs) createParams := component.ProcessorCreateParams{Logger: zap.NewNop()} batcher := newBatchLogsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed) @@ -668,7 +668,7 @@ func TestBatchLogProcessor_Shutdown(t *testing.T) { } requestCount := 5 logsPerRequest := 10 - sink := &exportertest.SinkLogsExporter{} + sink := new(consumertest.SinkLogs) createParams := component.ProcessorCreateParams{Logger: zap.NewNop()} batcher := newBatchLogsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed) diff --git a/processor/cloningfanoutconnector_test.go b/processor/cloningfanoutconnector_test.go index f9ffd6caa0a..781f8fe5280 100644 --- a/processor/cloningfanoutconnector_test.go +++ b/processor/cloningfanoutconnector_test.go @@ -22,7 +22,6 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" - "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/internal/data/testdata" ) @@ -35,7 +34,7 @@ func TestTraceProcessorCloningNotMultiplexing(t *testing.T) { func TestTraceProcessorCloningMultiplexing(t *testing.T) { processors := make([]consumer.TraceConsumer, 3) for i := range processors { - processors[i] = new(exportertest.SinkTraceExporter) + processors[i] = new(consumertest.SinkTraces) } tfc := NewTracesCloningFanOutConnector(processors) @@ -52,7 +51,7 @@ func TestTraceProcessorCloningMultiplexing(t *testing.T) { } for i, p := range processors { - m := p.(*exportertest.SinkTraceExporter) + m := p.(*consumertest.SinkTraces) assert.Equal(t, wantSpansCount, m.SpansCount()) spanOrig := td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0) allTraces := m.AllTraces() @@ -78,7 +77,7 @@ func TestMetricsProcessorCloningNotMultiplexing(t *testing.T) { func TestMetricsProcessorCloningMultiplexing(t *testing.T) { processors := make([]consumer.MetricsConsumer, 3) for i := range processors { - processors[i] = new(exportertest.SinkMetricsExporter) + processors[i] = new(consumertest.SinkMetrics) } mfc := NewMetricsCloningFanOutConnector(processors) @@ -95,7 +94,7 @@ func TestMetricsProcessorCloningMultiplexing(t *testing.T) { } for i, p := range processors { - m := p.(*exportertest.SinkMetricsExporter) + m := p.(*consumertest.SinkMetrics) assert.Equal(t, wantMetricsCount, m.MetricsCount()) metricOrig := md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0) allMetrics := m.AllMetrics() @@ -121,7 +120,7 @@ func TestLogsProcessorCloningNotMultiplexing(t *testing.T) { func TestLogsProcessorCloningMultiplexing(t *testing.T) { processors := make([]consumer.LogsConsumer, 3) for i := range processors { - processors[i] = new(exportertest.SinkLogsExporter) + processors[i] = new(consumertest.SinkLogs) } mfc := NewLogsCloningFanOutConnector(processors) @@ -138,7 +137,7 @@ func TestLogsProcessorCloningMultiplexing(t *testing.T) { } for i, p := range processors { - m := p.(*exportertest.SinkLogsExporter) + m := p.(*consumertest.SinkLogs) assert.Equal(t, wantMetricsCount, m.LogRecordsCount()) metricOrig := ld.ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).Logs().At(0) allLogs := m.AllLogs() diff --git a/processor/fanoutconnector_test.go b/processor/fanoutconnector_test.go index ffe6dbf96a3..bc49997bca9 100644 --- a/processor/fanoutconnector_test.go +++ b/processor/fanoutconnector_test.go @@ -23,7 +23,6 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" - "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/internal/data/testdata" ) @@ -36,7 +35,7 @@ func TestTracesProcessorNotMultiplexing(t *testing.T) { func TestTracesProcessorMultiplexing(t *testing.T) { processors := make([]consumer.TraceConsumer, 3) for i := range processors { - processors[i] = new(exportertest.SinkTraceExporter) + processors[i] = new(consumertest.SinkTraces) } tfc := NewTracesFanOutConnector(processors) @@ -53,7 +52,7 @@ func TestTracesProcessorMultiplexing(t *testing.T) { } for _, p := range processors { - m := p.(*exportertest.SinkTraceExporter) + m := p.(*consumertest.SinkTraces) assert.Equal(t, wantSpansCount, m.SpansCount()) assert.EqualValues(t, td, m.AllTraces()[0]) } @@ -62,11 +61,11 @@ func TestTracesProcessorMultiplexing(t *testing.T) { func TestTraceProcessorWhenOneErrors(t *testing.T) { processors := make([]consumer.TraceConsumer, 3) for i := range processors { - processors[i] = new(exportertest.SinkTraceExporter) + processors[i] = new(consumertest.SinkTraces) } // Make one processor return error - processors[1].(*exportertest.SinkTraceExporter).SetConsumeTraceError(errors.New("my_error")) + processors[1].(*consumertest.SinkTraces).SetConsumeError(errors.New("my_error")) tfc := NewTracesFanOutConnector(processors) td := testdata.GenerateTraceDataOneSpan() @@ -81,9 +80,9 @@ func TestTraceProcessorWhenOneErrors(t *testing.T) { } } - assert.Equal(t, 0, processors[1].(*exportertest.SinkTraceExporter).SpansCount()) - assert.Equal(t, wantSpansCount, processors[0].(*exportertest.SinkTraceExporter).SpansCount()) - assert.Equal(t, wantSpansCount, processors[2].(*exportertest.SinkTraceExporter).SpansCount()) + assert.Equal(t, 0, processors[1].(*consumertest.SinkTraces).SpansCount()) + assert.Equal(t, wantSpansCount, processors[0].(*consumertest.SinkTraces).SpansCount()) + assert.Equal(t, wantSpansCount, processors[2].(*consumertest.SinkTraces).SpansCount()) } func TestMetricsProcessorNotMultiplexing(t *testing.T) { @@ -95,7 +94,7 @@ func TestMetricsProcessorNotMultiplexing(t *testing.T) { func TestMetricsProcessorMultiplexing(t *testing.T) { processors := make([]consumer.MetricsConsumer, 3) for i := range processors { - processors[i] = new(exportertest.SinkMetricsExporter) + processors[i] = new(consumertest.SinkMetrics) } mfc := NewMetricsFanOutConnector(processors) @@ -112,7 +111,7 @@ func TestMetricsProcessorMultiplexing(t *testing.T) { } for _, p := range processors { - m := p.(*exportertest.SinkMetricsExporter) + m := p.(*consumertest.SinkMetrics) assert.Equal(t, wantMetricsCount, m.MetricsCount()) assert.EqualValues(t, md, m.AllMetrics()[0]) } @@ -121,11 +120,11 @@ func TestMetricsProcessorMultiplexing(t *testing.T) { func TestMetricsProcessorWhenOneErrors(t *testing.T) { processors := make([]consumer.MetricsConsumer, 3) for i := range processors { - processors[i] = new(exportertest.SinkMetricsExporter) + processors[i] = new(consumertest.SinkMetrics) } // Make one processor return error - processors[1].(*exportertest.SinkMetricsExporter).SetConsumeMetricsError(errors.New("my_error")) + processors[1].(*consumertest.SinkMetrics).SetConsumeError(errors.New("my_error")) mfc := NewMetricsFanOutConnector(processors) md := testdata.GenerateMetricsOneMetric() @@ -140,9 +139,9 @@ func TestMetricsProcessorWhenOneErrors(t *testing.T) { } } - assert.Equal(t, 0, processors[1].(*exportertest.SinkMetricsExporter).MetricsCount()) - assert.Equal(t, wantMetricsCount, processors[0].(*exportertest.SinkMetricsExporter).MetricsCount()) - assert.Equal(t, wantMetricsCount, processors[2].(*exportertest.SinkMetricsExporter).MetricsCount()) + assert.Equal(t, 0, processors[1].(*consumertest.SinkMetrics).MetricsCount()) + assert.Equal(t, wantMetricsCount, processors[0].(*consumertest.SinkMetrics).MetricsCount()) + assert.Equal(t, wantMetricsCount, processors[2].(*consumertest.SinkMetrics).MetricsCount()) } func TestLogsProcessorNotMultiplexing(t *testing.T) { @@ -154,7 +153,7 @@ func TestLogsProcessorNotMultiplexing(t *testing.T) { func TestLogsProcessorMultiplexing(t *testing.T) { processors := make([]consumer.LogsConsumer, 3) for i := range processors { - processors[i] = new(exportertest.SinkLogsExporter) + processors[i] = new(consumertest.SinkLogs) } lfc := NewLogsFanOutConnector(processors) @@ -171,7 +170,7 @@ func TestLogsProcessorMultiplexing(t *testing.T) { } for _, p := range processors { - m := p.(*exportertest.SinkLogsExporter) + m := p.(*consumertest.SinkLogs) assert.Equal(t, wantMetricsCount, m.LogRecordsCount()) assert.EqualValues(t, ld, m.AllLogs()[0]) } @@ -180,11 +179,11 @@ func TestLogsProcessorMultiplexing(t *testing.T) { func TestLogsProcessorWhenOneErrors(t *testing.T) { processors := make([]consumer.LogsConsumer, 3) for i := range processors { - processors[i] = new(exportertest.SinkLogsExporter) + processors[i] = new(consumertest.SinkLogs) } // Make one processor return error - processors[1].(*exportertest.SinkLogsExporter).SetConsumeLogError(errors.New("my_error")) + processors[1].(*consumertest.SinkLogs).SetConsumeError(errors.New("my_error")) lfc := NewLogsFanOutConnector(processors) ld := testdata.GenerateLogDataOneLog() @@ -199,7 +198,7 @@ func TestLogsProcessorWhenOneErrors(t *testing.T) { } } - assert.Equal(t, 0, processors[1].(*exportertest.SinkLogsExporter).LogRecordsCount()) - assert.Equal(t, wantMetricsCount, processors[0].(*exportertest.SinkLogsExporter).LogRecordsCount()) - assert.Equal(t, wantMetricsCount, processors[2].(*exportertest.SinkLogsExporter).LogRecordsCount()) + assert.Equal(t, 0, processors[1].(*consumertest.SinkLogs).LogRecordsCount()) + assert.Equal(t, wantMetricsCount, processors[0].(*consumertest.SinkLogs).LogRecordsCount()) + assert.Equal(t, wantMetricsCount, processors[2].(*consumertest.SinkLogs).LogRecordsCount()) } diff --git a/processor/memorylimiter/memorylimiter_test.go b/processor/memorylimiter/memorylimiter_test.go index d088a895d54..dc8d9639389 100644 --- a/processor/memorylimiter/memorylimiter_test.go +++ b/processor/memorylimiter/memorylimiter_test.go @@ -28,7 +28,6 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/processor/memorylimiter/internal/iruntime" "go.opentelemetry.io/collector/processor/processorhelper" ) @@ -40,7 +39,7 @@ func TestNew(t *testing.T) { memoryLimitMiB uint32 memorySpikeLimitMiB uint32 } - sink := new(exportertest.SinkTraceExporter) + sink := new(consumertest.SinkTraces) tests := []struct { name string args args diff --git a/processor/samplingprocessor/probabilisticsamplerprocessor/probabilisticsampler_test.go b/processor/samplingprocessor/probabilisticsamplerprocessor/probabilisticsampler_test.go index 26b18a118c3..ac384a74662 100644 --- a/processor/samplingprocessor/probabilisticsamplerprocessor/probabilisticsampler_test.go +++ b/processor/samplingprocessor/probabilisticsamplerprocessor/probabilisticsampler_test.go @@ -28,7 +28,6 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/exporter/exportertest" tracetranslator "go.opentelemetry.io/collector/translator/trace" ) @@ -144,7 +143,7 @@ func Test_tracesamplerprocessor_SamplingPercentageRange(t *testing.T) { const testSvcName = "test-svc" for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - sink := &exportertest.SinkTraceExporter{} + sink := new(consumertest.SinkTraces) tsp, err := newTraceProcessor(sink, tt.cfg) if err != nil { t.Errorf("error when creating tracesamplerprocessor: %v", err) @@ -206,7 +205,7 @@ func Test_tracesamplerprocessor_SamplingPercentageRange_MultipleResourceSpans(t const testSvcName = "test-svc" for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - sink := &exportertest.SinkTraceExporter{} + sink := new(consumertest.SinkTraces) tsp, err := newTraceProcessor(sink, tt.cfg) if err != nil { t.Errorf("error when creating tracesamplerprocessor: %v", err) @@ -323,7 +322,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - sink := &exportertest.SinkTraceExporter{} + sink := new(consumertest.SinkTraces) tsp, err := newTraceProcessor(sink, tt.cfg) require.NoError(t, err) diff --git a/processor/samplingprocessor/tailsamplingprocessor/processor_test.go b/processor/samplingprocessor/tailsamplingprocessor/processor_test.go index b801b39c325..df50ff9c9f3 100644 --- a/processor/samplingprocessor/tailsamplingprocessor/processor_test.go +++ b/processor/samplingprocessor/tailsamplingprocessor/processor_test.go @@ -28,7 +28,6 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/internal/data/testdata" "go.opentelemetry.io/collector/processor/samplingprocessor/tailsamplingprocessor/idbatcher" "go.opentelemetry.io/collector/processor/samplingprocessor/tailsamplingprocessor/sampling" @@ -157,7 +156,7 @@ func TestSamplingPolicyTypicalPath(t *testing.T) { const decisionWaitSeconds = 5 // For this test explicitly control the timer calls and batcher, and set a mock // sampling policy evaluator. - msp := new(exportertest.SinkTraceExporter) + msp := new(consumertest.SinkTraces) mpe := &mockPolicyEvaluator{} mtt := &manualTTicker{} tsp := &tailSamplingSpanProcessor{ @@ -213,7 +212,7 @@ func TestSamplingMultiplePolicies(t *testing.T) { const decisionWaitSeconds = 5 // For this test explicitly control the timer calls and batcher, and set a mock // sampling policy evaluator. - msp := new(exportertest.SinkTraceExporter) + msp := new(consumertest.SinkTraces) mpe1 := &mockPolicyEvaluator{} mpe2 := &mockPolicyEvaluator{} mtt := &manualTTicker{} @@ -279,7 +278,7 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) { const decisionWaitSeconds = 5 // For this test explicitly control the timer calls and batcher, and set a mock // sampling policy evaluator. - msp := new(exportertest.SinkTraceExporter) + msp := new(consumertest.SinkTraces) mpe := &mockPolicyEvaluator{} mtt := &manualTTicker{} tsp := &tailSamplingSpanProcessor{ @@ -338,7 +337,7 @@ func TestMultipleBatchesAreCombinedIntoOne(t *testing.T) { const decisionWaitSeconds = 1 // For this test explicitly control the timer calls and batcher, and set a mock // sampling policy evaluator. - msp := new(exportertest.SinkTraceExporter) + msp := new(consumertest.SinkTraces) mpe := &mockPolicyEvaluator{} mtt := &manualTTicker{} tsp := &tailSamplingSpanProcessor{ diff --git a/receiver/fluentforwardreceiver/receiver_test.go b/receiver/fluentforwardreceiver/receiver_test.go index 29aaf8903b5..bff09d11864 100644 --- a/receiver/fluentforwardreceiver/receiver_test.go +++ b/receiver/fluentforwardreceiver/receiver_test.go @@ -31,16 +31,16 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zaptest/observer" + "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/receiver/fluentforwardreceiver/testdata" "go.opentelemetry.io/collector/testutil/logstest" ) -func setupServer(t *testing.T) (func() net.Conn, *exportertest.SinkLogsExporter, *observer.ObservedLogs, context.CancelFunc) { +func setupServer(t *testing.T) (func() net.Conn, *consumertest.SinkLogs, *observer.ObservedLogs, context.CancelFunc) { ctx, cancel := context.WithCancel(context.Background()) - next := &exportertest.SinkLogsExporter{} + next := new(consumertest.SinkLogs) logCore, logObserver := observer.New(zap.DebugLevel) logger := zap.New(logCore) @@ -359,7 +359,7 @@ func TestUnixEndpoint(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - next := &exportertest.SinkLogsExporter{} + next := new(consumertest.SinkLogs) tmpdir, err := ioutil.TempDir("", "fluent-socket") require.NoError(t, err) diff --git a/receiver/hostmetricsreceiver/hostmetrics_receiver_test.go b/receiver/hostmetricsreceiver/hostmetrics_receiver_test.go index 02a610df897..405c985573d 100644 --- a/receiver/hostmetricsreceiver/hostmetrics_receiver_test.go +++ b/receiver/hostmetricsreceiver/hostmetrics_receiver_test.go @@ -27,8 +27,8 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/scraper/cpuscraper" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/scraper/diskscraper" @@ -92,7 +92,7 @@ var resourceFactories = map[string]internal.ResourceScraperFactory{ } func TestGatherMetrics_EndToEnd(t *testing.T) { - sink := &exportertest.SinkMetricsExporter{} + sink := new(consumertest.SinkMetrics) config := &Config{ CollectionInterval: 100 * time.Millisecond, @@ -236,7 +236,7 @@ func TestGatherMetrics_ScraperKeyConfigError(t *testing.T) { var mockFactories = map[string]internal.ScraperFactory{} var mockResourceFactories = map[string]internal.ResourceScraperFactory{} - sink := &exportertest.SinkMetricsExporter{} + sink := new(consumertest.SinkMetrics) config := &Config{Scrapers: map[string]internal.Config{"error": &mockConfig{}}} _, err := newHostMetricsReceiver(context.Background(), zap.NewNop(), config, mockFactories, mockResourceFactories, sink) @@ -249,7 +249,7 @@ func TestGatherMetrics_CreateMetricsScraperError(t *testing.T) { var mockFactories = map[string]internal.ScraperFactory{mockTypeStr: mFactory} var mockResourceFactories = map[string]internal.ResourceScraperFactory{} - sink := &exportertest.SinkMetricsExporter{} + sink := new(consumertest.SinkMetrics) config := &Config{Scrapers: map[string]internal.Config{mockTypeStr: &mockConfig{}}} _, err := newHostMetricsReceiver(context.Background(), zap.NewNop(), config, mockFactories, mockResourceFactories, sink) require.Error(t, err) @@ -261,7 +261,7 @@ func TestGatherMetrics_CreateMetricsResourceScraperError(t *testing.T) { var mockFactories = map[string]internal.ScraperFactory{} var mockResourceFactories = map[string]internal.ResourceScraperFactory{mockTypeStr: mResourceFactory} - sink := &exportertest.SinkMetricsExporter{} + sink := new(consumertest.SinkMetrics) config := &Config{Scrapers: map[string]internal.Config{mockTypeStr: &mockConfig{}}} _, err := newHostMetricsReceiver(context.Background(), zap.NewNop(), config, mockFactories, mockResourceFactories, sink) require.Error(t, err) @@ -276,7 +276,7 @@ func TestGatherMetrics_Error(t *testing.T) { var mockFactories = map[string]internal.ScraperFactory{mockTypeStr: mFactory} var mockResourceFactories = map[string]internal.ResourceScraperFactory{mockResourceTypeStr: mResourceFactory} - sink := &exportertest.SinkMetricsExporter{} + sink := new(consumertest.SinkMetrics) config := &Config{ Scrapers: map[string]internal.Config{ @@ -304,7 +304,7 @@ func TestGatherMetrics_Error(t *testing.T) { } func benchmarkScrapeMetrics(b *testing.B, cfg *Config) { - sink := &exportertest.SinkMetricsExporter{} + sink := new(consumertest.SinkMetrics) receiver, _ := newHostMetricsReceiver(context.Background(), zap.NewNop(), cfg, factories, resourceFactories, sink) receiver.initializeScrapers(context.Background(), componenttest.NewNopHost()) diff --git a/receiver/jaegerreceiver/jaeger_agent_test.go b/receiver/jaegerreceiver/jaeger_agent_test.go index b72fdea591d..791bd9b0f74 100644 --- a/receiver/jaegerreceiver/jaeger_agent_test.go +++ b/receiver/jaegerreceiver/jaeger_agent_test.go @@ -37,8 +37,8 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configtls" + "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/testutil" "go.opentelemetry.io/collector/translator/conventions" "go.opentelemetry.io/collector/translator/trace/jaeger" @@ -188,7 +188,7 @@ func TestJaegerHTTP(t *testing.T) { func testJaegerAgent(t *testing.T, agentEndpoint string, receiverConfig *configuration) { // 1. Create the Jaeger receiver aka "server" - sink := new(exportertest.SinkTraceExporter) + sink := new(consumertest.SinkTraces) params := component.ReceiverCreateParams{Logger: zap.NewNop()} jr, err := newJaegerReceiver(jaegerAgent, receiverConfig, sink, params) assert.NoError(t, err, "Failed to create new Jaeger Receiver") diff --git a/receiver/jaegerreceiver/trace_receiver_test.go b/receiver/jaegerreceiver/trace_receiver_test.go index edd33aa456f..dd02d6aa581 100644 --- a/receiver/jaegerreceiver/trace_receiver_test.go +++ b/receiver/jaegerreceiver/trace_receiver_test.go @@ -49,7 +49,6 @@ import ( "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/testutil" "go.opentelemetry.io/collector/translator/conventions" tracetranslator "go.opentelemetry.io/collector/translator/trace" @@ -135,7 +134,7 @@ func TestReception(t *testing.T) { config := &configuration{ CollectorHTTPPort: int(port), // that's the only one used by this test } - sink := new(exportertest.SinkTraceExporter) + sink := new(consumertest.SinkTraces) params := component.ReceiverCreateParams{Logger: zap.NewNop()} jr, err := newJaegerReceiver(jaegerReceiver, config, sink, params) @@ -169,7 +168,7 @@ func TestPortsNotOpen(t *testing.T) { // an empty config should result in no open ports config := &configuration{} - sink := new(exportertest.SinkTraceExporter) + sink := new(consumertest.SinkTraces) params := component.ReceiverCreateParams{Logger: zap.NewNop()} jr, err := newJaegerReceiver(jaegerReceiver, config, sink, params) @@ -199,7 +198,7 @@ func TestGRPCReception(t *testing.T) { config := &configuration{ CollectorGRPCPort: 14250, // that's the only one used by this test } - sink := new(exportertest.SinkTraceExporter) + sink := new(consumertest.SinkTraces) params := component.ReceiverCreateParams{Logger: zap.NewNop()} jr, err := newJaegerReceiver(jaegerReceiver, config, sink, params) @@ -257,7 +256,7 @@ func TestGRPCReceptionWithTLS(t *testing.T) { CollectorGRPCPort: int(port), CollectorGRPCOptions: grpcServerOptions, } - sink := new(exportertest.SinkTraceExporter) + sink := new(consumertest.SinkTraces) params := component.ReceiverCreateParams{Logger: zap.NewNop()} jr, err := newJaegerReceiver(jaegerReceiver, config, sink, params) @@ -397,7 +396,7 @@ func TestSampling(t *testing.T) { CollectorGRPCPort: int(port), RemoteSamplingStrategyFile: "testdata/strategies.json", } - sink := new(exportertest.SinkTraceExporter) + sink := new(consumertest.SinkTraces) params := component.ReceiverCreateParams{Logger: zap.NewNop()} jr, err := newJaegerReceiver(jaegerReceiver, config, sink, params) @@ -450,7 +449,7 @@ func TestSamplingFailsOnNotConfigured(t *testing.T) { config := &configuration{ CollectorGRPCPort: int(port), } - sink := new(exportertest.SinkTraceExporter) + sink := new(consumertest.SinkTraces) params := component.ReceiverCreateParams{Logger: zap.NewNop()} jr, err := newJaegerReceiver(jaegerReceiver, config, sink, params) @@ -480,7 +479,7 @@ func TestSamplingFailsOnBadFile(t *testing.T) { CollectorGRPCPort: int(port), RemoteSamplingStrategyFile: "does-not-exist", } - sink := new(exportertest.SinkTraceExporter) + sink := new(consumertest.SinkTraces) params := component.ReceiverCreateParams{Logger: zap.NewNop()} jr, err := newJaegerReceiver(jaegerReceiver, config, sink, params) diff --git a/receiver/kafkareceiver/kafka_receiver_test.go b/receiver/kafkareceiver/kafka_receiver_test.go index ca9550e571a..fd8d3db9f70 100644 --- a/receiver/kafkareceiver/kafka_receiver_test.go +++ b/receiver/kafkareceiver/kafka_receiver_test.go @@ -33,7 +33,6 @@ import ( "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/exporter/kafkaexporter" otlptrace "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/trace/v1" ) @@ -201,9 +200,9 @@ func TestConsumerGroupHandler_error_unmarshall(t *testing.T) { } func TestConsumerGroupHandler_error_nextConsumer(t *testing.T) { - nextConsumer := &exportertest.SinkTraceExporter{} + nextConsumer := new(consumertest.SinkTraces) consumerError := fmt.Errorf("failed to consumer") - nextConsumer.SetConsumeTraceError(consumerError) + nextConsumer.SetConsumeError(consumerError) c := consumerGroupHandler{ unmarshaller: &otlpProtoUnmarshaller{}, logger: zap.NewNop(), diff --git a/receiver/opencensusreceiver/ocmetrics/opencensus_test.go b/receiver/opencensusreceiver/ocmetrics/opencensus_test.go index 8ec9972a96e..a1f015ba621 100644 --- a/receiver/opencensusreceiver/ocmetrics/opencensus_test.go +++ b/receiver/opencensusreceiver/ocmetrics/opencensus_test.go @@ -39,7 +39,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter/opencensusexporter" "go.opentelemetry.io/collector/internal/data/testdata" "go.opentelemetry.io/collector/obsreport" @@ -48,7 +48,7 @@ import ( ) func TestReceiver_endToEnd(t *testing.T) { - metricSink := new(exportertest.SinkMetricsExporter) + metricSink := new(consumertest.SinkMetrics) port, doneFn := ocReceiverOnGRPCServer(t, metricSink) defer doneFn() @@ -86,7 +86,7 @@ func TestReceiver_endToEnd(t *testing.T) { // accept nodes from downstream sources, but if a node isn't specified in // an exportMetrics request, assume it is from the last received and non-nil node. func TestExportMultiplexing(t *testing.T) { - metricSink := new(exportertest.SinkMetricsExporter) + metricSink := new(consumertest.SinkMetrics) port, doneFn := ocReceiverOnGRPCServer(t, metricSink) defer doneFn() @@ -198,7 +198,7 @@ func TestExportMultiplexing(t *testing.T) { // The first message without a Node MUST be rejected and teardown the connection. // See https://github.com/census-instrumentation/opencensus-service/issues/53 func TestExportProtocolViolations_nodelessFirstMessage(t *testing.T) { - metricSink := new(exportertest.SinkMetricsExporter) + metricSink := new(consumertest.SinkMetrics) port, doneFn := ocReceiverOnGRPCServer(t, metricSink) defer doneFn() @@ -270,7 +270,7 @@ func TestExportProtocolViolations_nodelessFirstMessage(t *testing.T) { func TestExportProtocolConformation_metricsInFirstMessage(t *testing.T) { // This test used to be flaky on Windows. Skip if errors pop up again - metricSink := new(exportertest.SinkMetricsExporter) + metricSink := new(consumertest.SinkMetrics) port, doneFn := ocReceiverOnGRPCServer(t, metricSink) defer doneFn() diff --git a/receiver/opencensusreceiver/octrace/opencensus_test.go b/receiver/opencensusreceiver/octrace/opencensus_test.go index 316c9949bb8..19e62faaa70 100644 --- a/receiver/opencensusreceiver/octrace/opencensus_test.go +++ b/receiver/opencensusreceiver/octrace/opencensus_test.go @@ -37,7 +37,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter/opencensusexporter" "go.opentelemetry.io/collector/internal/data/testdata" "go.opentelemetry.io/collector/obsreport" @@ -46,7 +46,7 @@ import ( ) func TestReceiver_endToEnd(t *testing.T) { - spanSink := new(exportertest.SinkTraceExporter) + spanSink := new(consumertest.SinkTraces) port, doneFn := ocReceiverOnGRPCServer(t, spanSink) defer doneFn() @@ -84,7 +84,7 @@ func TestReceiver_endToEnd(t *testing.T) { // accept nodes from downstream sources, but if a node isn't specified in // an exportTrace request, assume it is from the last received and non-nil node. func TestExportMultiplexing(t *testing.T) { - spanSink := new(exportertest.SinkTraceExporter) + spanSink := new(consumertest.SinkTraces) port, doneFn := ocReceiverOnGRPCServer(t, spanSink) defer doneFn() @@ -213,7 +213,7 @@ func TestExportMultiplexing(t *testing.T) { // The first message without a Node MUST be rejected and teardown the connection. // See https://github.com/census-instrumentation/opencensus-service/issues/53 func TestExportProtocolViolations_nodelessFirstMessage(t *testing.T) { - spanSink := new(exportertest.SinkTraceExporter) + spanSink := new(consumertest.SinkTraces) port, doneFn := ocReceiverOnGRPCServer(t, spanSink) defer doneFn() @@ -281,7 +281,7 @@ func TestExportProtocolViolations_nodelessFirstMessage(t *testing.T) { // spans should be received and NEVER discarded. // See https://github.com/census-instrumentation/opencensus-service/issues/51 func TestExportProtocolConformation_spansInFirstMessage(t *testing.T) { - spanSink := new(exportertest.SinkTraceExporter) + spanSink := new(consumertest.SinkTraces) port, doneFn := ocReceiverOnGRPCServer(t, spanSink) defer doneFn() diff --git a/receiver/opencensusreceiver/opencensus_test.go b/receiver/opencensusreceiver/opencensus_test.go index d660647adc2..c49504179c8 100644 --- a/receiver/opencensusreceiver/opencensus_test.go +++ b/receiver/opencensusreceiver/opencensus_test.go @@ -46,7 +46,6 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumerdata" "go.opentelemetry.io/collector/consumer/consumertest" - "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/obsreport/obsreporttest" "go.opentelemetry.io/collector/testutil" "go.opentelemetry.io/collector/translator/internaldata" @@ -59,7 +58,7 @@ func TestGrpcGateway_endToEnd(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) // Set the buffer count to 1 to make it flush the test span immediately. - sink := new(exportertest.SinkTraceExporter) + sink := new(consumertest.SinkTraces) ocr, err := newOpenCensusReceiver(ocReceiverName, "tcp", addr, sink, nil) require.NoError(t, err, "Failed to create trace receiver: %v", err) @@ -433,7 +432,7 @@ func TestOCReceiverTrace_HandleNextConsumerResponse(t *testing.T) { require.NoError(t, err) defer doneFn() - sink := new(exportertest.SinkTraceExporter) + sink := new(consumertest.SinkTraces) var opts []ocOption ocr, err := newOpenCensusReceiver(exporter.receiverTag, "tcp", addr, nil, nil, opts...) @@ -452,9 +451,9 @@ func TestOCReceiverTrace_HandleNextConsumerResponse(t *testing.T) { for _, ingestionState := range tt.ingestionStates { if ingestionState.okToIngest { - sink.SetConsumeTraceError(nil) + sink.SetConsumeError(nil) } else { - sink.SetConsumeTraceError(fmt.Errorf("%q: consumer error", tt.name)) + sink.SetConsumeError(fmt.Errorf("%q: consumer error", tt.name)) } err = exporter.exportFn(t, cc, msg) @@ -582,7 +581,7 @@ func TestOCReceiverMetrics_HandleNextConsumerResponse(t *testing.T) { require.NoError(t, err) defer doneFn() - sink := new(exportertest.SinkMetricsExporter) + sink := new(consumertest.SinkMetrics) var opts []ocOption ocr, err := newOpenCensusReceiver(exporter.receiverTag, "tcp", addr, nil, nil, opts...) @@ -601,9 +600,9 @@ func TestOCReceiverMetrics_HandleNextConsumerResponse(t *testing.T) { for _, ingestionState := range tt.ingestionStates { if ingestionState.okToIngest { - sink.SetConsumeMetricsError(nil) + sink.SetConsumeError(nil) } else { - sink.SetConsumeMetricsError(fmt.Errorf("%q: consumer error", tt.name)) + sink.SetConsumeError(fmt.Errorf("%q: consumer error", tt.name)) } err = exporter.exportFn(t, cc, msg) diff --git a/receiver/otlpreceiver/factory_test.go b/receiver/otlpreceiver/factory_test.go index 1e7136f970b..9e75b4c972c 100644 --- a/receiver/otlpreceiver/factory_test.go +++ b/receiver/otlpreceiver/factory_test.go @@ -30,7 +30,7 @@ import ( "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/testutil" ) @@ -50,11 +50,11 @@ func TestCreateReceiver(t *testing.T) { config.HTTP.Endpoint = testutil.GetAvailableLocalAddress(t) creationParams := component.ReceiverCreateParams{Logger: zap.NewNop()} - tReceiver, err := factory.CreateTraceReceiver(context.Background(), creationParams, cfg, new(exportertest.SinkTraceExporter)) + tReceiver, err := factory.CreateTraceReceiver(context.Background(), creationParams, cfg, new(consumertest.SinkTraces)) assert.NotNil(t, tReceiver) assert.NoError(t, err) - mReceiver, err := factory.CreateMetricsReceiver(context.Background(), creationParams, cfg, new(exportertest.SinkMetricsExporter)) + mReceiver, err := factory.CreateMetricsReceiver(context.Background(), creationParams, cfg, new(consumertest.SinkMetrics)) assert.NotNil(t, mReceiver) assert.NoError(t, err) } @@ -130,7 +130,7 @@ func TestCreateTraceReceiver(t *testing.T) { creationParams := component.ReceiverCreateParams{Logger: zap.NewNop()} for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - sink := new(exportertest.SinkTraceExporter) + sink := new(consumertest.SinkTraces) tr, err := factory.CreateTraceReceiver(ctx, creationParams, tt.cfg, sink) assert.NoError(t, err) require.NotNil(t, tr) @@ -215,7 +215,7 @@ func TestCreateMetricReceiver(t *testing.T) { creationParams := component.ReceiverCreateParams{Logger: zap.NewNop()} for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - sink := new(exportertest.SinkMetricsExporter) + sink := new(consumertest.SinkMetrics) mr, err := factory.CreateMetricsReceiver(ctx, creationParams, tt.cfg, sink) assert.NoError(t, err) require.NotNil(t, mr) @@ -261,7 +261,7 @@ func TestCreateLogReceiver(t *testing.T) { HTTP: defaultHTTPSettings, }, }, - sink: new(exportertest.SinkLogsExporter), + sink: new(consumertest.SinkLogs), }, { name: "invalid_grpc_address", @@ -281,7 +281,7 @@ func TestCreateLogReceiver(t *testing.T) { }, }, wantStartErr: true, - sink: new(exportertest.SinkLogsExporter), + sink: new(consumertest.SinkLogs), }, { name: "invalid_http_address", @@ -298,7 +298,7 @@ func TestCreateLogReceiver(t *testing.T) { }, }, wantStartErr: true, - sink: new(exportertest.SinkLogsExporter), + sink: new(consumertest.SinkLogs), }, { name: "no_next_consumer", @@ -327,7 +327,7 @@ func TestCreateLogReceiver(t *testing.T) { Protocols: Protocols{}, }, wantErr: false, - sink: new(exportertest.SinkLogsExporter), + sink: new(consumertest.SinkLogs), }, } ctx := context.Background() diff --git a/receiver/otlpreceiver/logs/otlp_test.go b/receiver/otlpreceiver/logs/otlp_test.go index d7e819ec430..a714ed97e3d 100644 --- a/receiver/otlpreceiver/logs/otlp_test.go +++ b/receiver/otlpreceiver/logs/otlp_test.go @@ -26,8 +26,8 @@ import ( "google.golang.org/grpc" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/internal" collectorlog "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/logs/v1" v1 "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/common/v1" @@ -41,7 +41,7 @@ var _ collectorlog.LogsServiceServer = (*Receiver)(nil) func TestExport(t *testing.T) { // given - logSink := new(exportertest.SinkLogsExporter) + logSink := new(consumertest.SinkLogs) port, doneFn := otlpReceiverOnGRPCServer(t, logSink) defer doneFn() @@ -97,7 +97,7 @@ func TestExport(t *testing.T) { } func TestExport_EmptyRequest(t *testing.T) { - logSink := new(exportertest.SinkLogsExporter) + logSink := new(consumertest.SinkLogs) port, doneFn := otlpReceiverOnGRPCServer(t, logSink) defer doneFn() @@ -112,8 +112,8 @@ func TestExport_EmptyRequest(t *testing.T) { } func TestExport_ErrorConsumer(t *testing.T) { - logSink := new(exportertest.SinkLogsExporter) - logSink.SetConsumeLogError(fmt.Errorf("error")) + logSink := new(consumertest.SinkLogs) + logSink.SetConsumeError(fmt.Errorf("error")) port, doneFn := otlpReceiverOnGRPCServer(t, logSink) defer doneFn() diff --git a/receiver/otlpreceiver/metrics/otlp_test.go b/receiver/otlpreceiver/metrics/otlp_test.go index 78c2cfc557e..14c88e4dc3d 100644 --- a/receiver/otlpreceiver/metrics/otlp_test.go +++ b/receiver/otlpreceiver/metrics/otlp_test.go @@ -25,8 +25,8 @@ import ( "google.golang.org/grpc" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/exporter/exportertest" collectormetrics "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/metrics/v1" otlpcommon "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/common/v1" otlpmetrics "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1" @@ -39,7 +39,7 @@ var _ collectormetrics.MetricsServiceServer = (*Receiver)(nil) func TestExport(t *testing.T) { // given - metricSink := new(exportertest.SinkMetricsExporter) + metricSink := new(consumertest.SinkMetrics) port, doneFn := otlpReceiverOnGRPCServer(t, metricSink) defer doneFn() @@ -122,7 +122,7 @@ func TestExport(t *testing.T) { func TestExport_EmptyRequest(t *testing.T) { // given - metricSink := new(exportertest.SinkMetricsExporter) + metricSink := new(consumertest.SinkMetrics) port, doneFn := otlpReceiverOnGRPCServer(t, metricSink) defer doneFn() @@ -139,8 +139,8 @@ func TestExport_EmptyRequest(t *testing.T) { func TestExport_ErrorConsumer(t *testing.T) { // given - metricSink := new(exportertest.SinkMetricsExporter) - metricSink.SetConsumeMetricsError(fmt.Errorf("error")) + metricSink := new(consumertest.SinkMetrics) + metricSink.SetConsumeError(fmt.Errorf("error")) port, doneFn := otlpReceiverOnGRPCServer(t, metricSink) defer doneFn() diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index d48bfc46304..564fa1d84ba 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -44,8 +44,8 @@ import ( "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/exporter/exportertest" collectortrace "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/trace/v1" otlpcommon "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/common/v1" otlpresource "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/resource/v1" @@ -154,7 +154,7 @@ func TestJsonHttp(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) // Set the buffer count to 1 to make it flush the test span immediately. - sink := new(exportertest.SinkTraceExporter) + sink := new(consumertest.SinkTraces) ocr := newHTTPReceiver(t, addr, sink, nil) require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver") @@ -177,7 +177,7 @@ func TestJsonHttp(t *testing.T) { default: buf = bytes.NewBuffer(traceJSON) } - sink.SetConsumeTraceError(test.err) + sink.SetConsumeError(test.err) req, err := http.NewRequest("POST", url, buf) require.NoError(t, err, "Error creating trace POST request: %v", err) req.Header.Set("Content-Type", "application/json") @@ -315,8 +315,8 @@ func TestProtoHttp(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) // Set the buffer count to 1 to make it flush the test span immediately. - tSink := new(exportertest.SinkTraceExporter) - mSink := new(exportertest.SinkMetricsExporter) + tSink := new(consumertest.SinkTraces) + mSink := new(consumertest.SinkMetrics) ocr := newHTTPReceiver(t, addr, tSink, mSink) require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver") @@ -347,7 +347,7 @@ func TestProtoHttp(t *testing.T) { default: buf = bytes.NewBuffer(traceBytes) } - tSink.SetConsumeTraceError(test.err) + tSink.SetConsumeError(test.err) req, err := http.NewRequest("POST", url, buf) require.NoError(t, err, "Error creating trace POST request: %v", err) req.Header.Set("Content-Type", "application/x-protobuf") @@ -435,8 +435,8 @@ func TestOTLPReceiverInvalidContentEncoding(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) // Set the buffer count to 1 to make it flush the test span immediately. - tSink := new(exportertest.SinkTraceExporter) - mSink := new(exportertest.SinkMetricsExporter) + tSink := new(consumertest.SinkTraces) + mSink := new(consumertest.SinkMetrics) ocr := newHTTPReceiver(t, addr, tSink, mSink) require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver") @@ -480,7 +480,7 @@ func TestGRPCNewPortAlreadyUsed(t *testing.T) { require.NoError(t, err, "failed to listen on %q: %v", addr, err) defer ln.Close() - r := newGRPCReceiver(t, otlpReceiverName, addr, new(exportertest.SinkTraceExporter), new(exportertest.SinkMetricsExporter)) + r := newGRPCReceiver(t, otlpReceiverName, addr, new(consumertest.SinkTraces), new(consumertest.SinkMetrics)) require.NotNil(t, r) require.Error(t, r.Start(context.Background(), componenttest.NewNopHost())) @@ -492,7 +492,7 @@ func TestHTTPNewPortAlreadyUsed(t *testing.T) { require.NoError(t, err, "failed to listen on %q: %v", addr, err) defer ln.Close() - r := newHTTPReceiver(t, addr, new(exportertest.SinkTraceExporter), new(exportertest.SinkMetricsExporter)) + r := newHTTPReceiver(t, addr, new(consumertest.SinkTraces), new(consumertest.SinkMetrics)) require.NotNil(t, r) require.Error(t, r.Start(context.Background(), componenttest.NewNopHost())) @@ -602,7 +602,7 @@ func TestOTLPReceiverTrace_HandleNextConsumerResponse(t *testing.T) { require.NoError(t, err) defer doneFn() - sink := new(exportertest.SinkTraceExporter) + sink := new(consumertest.SinkTraces) ocr := newGRPCReceiver(t, exporter.receiverTag, addr, sink, nil) require.NotNil(t, ocr) @@ -615,9 +615,9 @@ func TestOTLPReceiverTrace_HandleNextConsumerResponse(t *testing.T) { for _, ingestionState := range tt.ingestionStates { if ingestionState.okToIngest { - sink.SetConsumeTraceError(nil) + sink.SetConsumeError(nil) } else { - sink.SetConsumeTraceError(fmt.Errorf("%q: consumer error", tt.name)) + sink.SetConsumeError(fmt.Errorf("%q: consumer error", tt.name)) } err = exporter.exportFn(t, cc, req) @@ -679,7 +679,7 @@ func TestHTTPInvalidTLSCredentials(t *testing.T) { } // TLS is resolved during Start for HTTP. - r := newReceiver(t, NewFactory(), cfg, new(exportertest.SinkTraceExporter), new(exportertest.SinkMetricsExporter)) + r := newReceiver(t, NewFactory(), cfg, new(consumertest.SinkTraces), new(consumertest.SinkMetrics)) assert.EqualError(t, r.Start(context.Background(), componenttest.NewNopHost()), `failed to load TLS config: for auth via TLS, either both certificate and key must be supplied, or neither`) } diff --git a/receiver/otlpreceiver/trace/otlp_test.go b/receiver/otlpreceiver/trace/otlp_test.go index 2fd0452e55e..8fae16e749d 100644 --- a/receiver/otlpreceiver/trace/otlp_test.go +++ b/receiver/otlpreceiver/trace/otlp_test.go @@ -26,8 +26,8 @@ import ( "google.golang.org/grpc" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/exporter/exportertest" collectortrace "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/trace/v1" v1 "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/common/v1" otlptrace "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/trace/v1" @@ -40,7 +40,7 @@ var _ collectortrace.TraceServiceServer = (*Receiver)(nil) func TestExport(t *testing.T) { // given - traceSink := new(exportertest.SinkTraceExporter) + traceSink := new(consumertest.SinkTraces) port, doneFn := otlpReceiverOnGRPCServer(t, traceSink) defer doneFn() @@ -100,7 +100,7 @@ func TestExport(t *testing.T) { } func TestExport_EmptyRequest(t *testing.T) { - traceSink := new(exportertest.SinkTraceExporter) + traceSink := new(consumertest.SinkTraces) port, doneFn := otlpReceiverOnGRPCServer(t, traceSink) defer doneFn() @@ -115,8 +115,8 @@ func TestExport_EmptyRequest(t *testing.T) { } func TestExport_ErrorConsumer(t *testing.T) { - traceSink := new(exportertest.SinkTraceExporter) - traceSink.SetConsumeTraceError(fmt.Errorf("error")) + traceSink := new(consumertest.SinkTraces) + traceSink.SetConsumeError(fmt.Errorf("error")) port, doneFn := otlpReceiverOnGRPCServer(t, traceSink) defer doneFn() diff --git a/receiver/prometheusreceiver/internal/transaction_test.go b/receiver/prometheusreceiver/internal/transaction_test.go index 79bf8b8d24b..1672801be8c 100644 --- a/receiver/prometheusreceiver/internal/transaction_test.go +++ b/receiver/prometheusreceiver/internal/transaction_test.go @@ -26,7 +26,6 @@ import ( "google.golang.org/protobuf/proto" "go.opentelemetry.io/collector/consumer/consumertest" - "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/translator/internaldata" ) @@ -103,7 +102,7 @@ func Test_transaction(t *testing.T) { {Name: "job", Value: "test"}, {Name: "__name__", Value: "foo"}}) t.Run("Add One Good", func(t *testing.T) { - sink := new(exportertest.SinkMetricsExporter) + sink := new(consumertest.SinkMetrics) tr := newTransaction(context.Background(), nil, true, "", rn, ms, sink, testLogger) if _, got := tr.Add(goodLabels, time.Now().Unix()*1000, 1.0); got != nil { t.Errorf("expecting error == nil from Add() but got: %v\n", got) @@ -133,7 +132,7 @@ func Test_transaction(t *testing.T) { }) t.Run("Error when start time is zero", func(t *testing.T) { - sink := new(exportertest.SinkMetricsExporter) + sink := new(consumertest.SinkMetrics) tr := newTransaction(context.Background(), nil, true, "", rn, ms, sink, testLogger) if _, got := tr.Add(goodLabels, time.Now().Unix()*1000, 1.0); got != nil { t.Errorf("expecting error == nil from Add() but got: %v\n", got) @@ -148,7 +147,7 @@ func Test_transaction(t *testing.T) { }) t.Run("Drop NaN value", func(t *testing.T) { - sink := new(exportertest.SinkMetricsExporter) + sink := new(consumertest.SinkMetrics) tr := newTransaction(context.Background(), nil, true, "", rn, ms, sink, testLogger) if _, got := tr.Add(goodLabels, time.Now().Unix()*1000, math.NaN()); got != nil { t.Errorf("expecting error == nil from Add() but got: %v\n", got) diff --git a/receiver/prometheusreceiver/metrics_receiver_test.go b/receiver/prometheusreceiver/metrics_receiver_test.go index 1a9d976f8e5..5c8543609fc 100644 --- a/receiver/prometheusreceiver/metrics_receiver_test.go +++ b/receiver/prometheusreceiver/metrics_receiver_test.go @@ -39,7 +39,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumerdata" - "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/translator/internaldata" ) @@ -954,7 +954,7 @@ func testEndToEnd(t *testing.T, targets []*testData, useStartTimeMetric bool) { require.Nilf(t, err, "Failed to create Promtheus config: %v", err) defer mp.Close() - cms := new(exportertest.SinkMetricsExporter) + cms := new(consumertest.SinkMetrics) rcvr := newPrometheusReceiver(logger, &Config{PrometheusConfig: cfg, UseStartTimeMetric: useStartTimeMetric}, cms) require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()), "Failed to invoke Start: %v", err) @@ -1041,7 +1041,7 @@ func testEndToEndRegex(t *testing.T, targets []*testData, useStartTimeMetric boo require.Nilf(t, err, "Failed to create Promtheus config: %v", err) defer mp.Close() - cms := new(exportertest.SinkMetricsExporter) + cms := new(consumertest.SinkMetrics) rcvr := newPrometheusReceiver(logger, &Config{PrometheusConfig: cfg, UseStartTimeMetric: useStartTimeMetric, StartTimeMetricRegex: startTimeMetricRegex}, cms) require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()), "Failed to invoke Start: %v", err) diff --git a/receiver/receiverhelper/scrapercontroller_test.go b/receiver/receiverhelper/scrapercontroller_test.go index 410d2188d2f..ea705b2e403 100644 --- a/receiver/receiverhelper/scrapercontroller_test.go +++ b/receiver/receiverhelper/scrapercontroller_test.go @@ -26,8 +26,8 @@ import ( "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/exporter/exportertest" ) type testInitialize struct { @@ -187,7 +187,7 @@ func TestScrapeController(t *testing.T) { options := configureMetricOptions(test, initializeChs, scrapeMetricsChs, scrapeResourceMetricsChs, closeChs) var nextConsumer consumer.MetricsConsumer - sink := &exportertest.SinkMetricsExporter{} + sink := new(consumertest.SinkMetrics) if !test.nilNextConsumer { nextConsumer = sink } @@ -343,7 +343,7 @@ func TestSingleScrapePerTick(t *testing.T) { receiver, err := NewScraperControllerReceiver( cfg, - &exportertest.SinkMetricsExporter{}, + new(consumertest.SinkMetrics), AddMetricsScraper(NewMetricsScraper(tsm.scrape)), AddResourceMetricsScraper(NewResourceMetricsScraper(tsrm.scrape)), WithTickerChannel(tickerCh), diff --git a/receiver/zipkinreceiver/trace_receiver_test.go b/receiver/zipkinreceiver/trace_receiver_test.go index d117184fbb1..29567a22a8a 100644 --- a/receiver/zipkinreceiver/trace_receiver_test.go +++ b/receiver/zipkinreceiver/trace_receiver_test.go @@ -44,7 +44,6 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/exporter/zipkinexporter" "go.opentelemetry.io/collector/testutil" "go.opentelemetry.io/collector/translator/conventions" @@ -283,7 +282,7 @@ func TestStartTraceReception(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - sink := new(exportertest.SinkTraceExporter) + sink := new(consumertest.SinkTraces) cfg := &Config{ ReceiverSettings: configmodels.ReceiverSettings{ NameVal: zipkinReceiverName,