Skip to content

Commit

Permalink
Deprecate SinkExporter, add SinkConsumer
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Oct 19, 2020
1 parent 74ded3f commit 96297d6
Show file tree
Hide file tree
Showing 28 changed files with 434 additions and 159 deletions.
183 changes: 183 additions & 0 deletions consumer/consumertest/sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package consumertest

import (
"context"
"sync"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
)

type baseErrorConsumer struct {
mu sync.Mutex
consumeError error // to be returned by ConsumeTraces, if set
}

// SetConsumeError sets an error that will be returned by the Consume function.
func (bec *baseErrorConsumer) SetConsumeError(err error) {
bec.mu.Lock()
defer bec.mu.Unlock()
bec.consumeError = err
}

// SinkTraces acts as a trace receiver for use in tests.
type SinkTraces struct {
baseErrorConsumer
traces []pdata.Traces
spansCount int
}

var _ consumer.TraceConsumer = (*SinkTraces)(nil)

// ConsumeTraceData stores traces for tests.
func (ste *SinkTraces) ConsumeTraces(_ context.Context, td pdata.Traces) error {
ste.mu.Lock()
defer ste.mu.Unlock()

if ste.consumeError != nil {
return ste.consumeError
}

ste.traces = append(ste.traces, td)
ste.spansCount += td.SpanCount()

return nil
}

// AllTraces returns the traces sent to the test sink.
func (ste *SinkTraces) AllTraces() []pdata.Traces {
ste.mu.Lock()
defer ste.mu.Unlock()

copyTraces := make([]pdata.Traces, len(ste.traces))
copy(copyTraces, ste.traces)
return copyTraces
}

// SpansCount return the number of spans sent to the test sing.
func (ste *SinkTraces) SpansCount() int {
ste.mu.Lock()
defer ste.mu.Unlock()
return ste.spansCount
}

// Reset deletes any existing metrics.
func (ste *SinkTraces) Reset() {
ste.mu.Lock()
defer ste.mu.Unlock()

ste.traces = nil
ste.spansCount = 0
}

// SinkMetrics acts as a metrics receiver for use in tests.
type SinkMetrics struct {
baseErrorConsumer
metrics []pdata.Metrics
metricsCount int
}

var _ consumer.MetricsConsumer = (*SinkMetrics)(nil)

// ConsumeMetricsData stores traces for tests.
func (sme *SinkMetrics) ConsumeMetrics(_ context.Context, md pdata.Metrics) error {
sme.mu.Lock()
defer sme.mu.Unlock()
if sme.consumeError != nil {
return sme.consumeError
}

sme.metrics = append(sme.metrics, md)
sme.metricsCount += md.MetricCount()

return nil
}

// AllMetrics returns the metrics sent to the test sink.
func (sme *SinkMetrics) AllMetrics() []pdata.Metrics {
sme.mu.Lock()
defer sme.mu.Unlock()

copyMetrics := make([]pdata.Metrics, len(sme.metrics))
copy(copyMetrics, sme.metrics)
return copyMetrics
}

// MetricsCount return the number of metrics sent to the test sing.
func (sme *SinkMetrics) MetricsCount() int {
sme.mu.Lock()
defer sme.mu.Unlock()
return sme.metricsCount
}

// Reset deletes any existing metrics.
func (sme *SinkMetrics) Reset() {
sme.mu.Lock()
defer sme.mu.Unlock()

sme.metrics = nil
sme.metricsCount = 0
}

// SinkLogs acts as a metrics receiver for use in tests.
type SinkLogs struct {
baseErrorConsumer
logs []pdata.Logs
logRecordsCount int
}

var _ consumer.LogsConsumer = (*SinkLogs)(nil)

// ConsumeLogData stores traces for tests.
func (sle *SinkLogs) ConsumeLogs(_ context.Context, ld pdata.Logs) error {
sle.mu.Lock()
defer sle.mu.Unlock()
if sle.consumeError != nil {
return sle.consumeError
}

sle.logs = append(sle.logs, ld)
sle.logRecordsCount += ld.LogRecordCount()

return nil
}

// AllLog returns the metrics sent to the test sink.
func (sle *SinkLogs) AllLogs() []pdata.Logs {
sle.mu.Lock()
defer sle.mu.Unlock()

copyLogs := make([]pdata.Logs, len(sle.logs))
copy(copyLogs, sle.logs)
return copyLogs
}

// LogRecordsCount return the number of log records sent to the test sing.
func (sle *SinkLogs) LogRecordsCount() int {
sle.mu.Lock()
defer sle.mu.Unlock()
return sle.logRecordsCount
}

// Reset deletes any existing logs.
func (sle *SinkLogs) Reset() {
sle.mu.Lock()
defer sle.mu.Unlock()

sle.logs = nil
sle.logRecordsCount = 0
}
99 changes: 99 additions & 0 deletions consumer/consumertest/sink_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package consumertest

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/data/testdata"
)

func TestSinkTraces(t *testing.T) {
sink := new(SinkTraces)
td := testdata.GenerateTraceDataOneSpan()
want := make([]pdata.Traces, 0, 7)
for i := 0; i < 7; i++ {
require.NoError(t, sink.ConsumeTraces(context.Background(), td))
want = append(want, td)
}
assert.Equal(t, want, sink.AllTraces())
assert.Equal(t, len(want), sink.SpansCount())
sink.Reset()
assert.Equal(t, 0, len(sink.AllTraces()))
assert.Equal(t, 0, sink.SpansCount())
}

func TestSinkTraces_Error(t *testing.T) {
sink := new(SinkTraces)
sink.SetConsumeError(errors.New("my error"))
td := testdata.GenerateTraceDataOneSpan()
require.Error(t, sink.ConsumeTraces(context.Background(), td))
assert.Len(t, sink.AllTraces(), 0)
assert.Equal(t, 0, sink.SpansCount())
}

func TestSinkMetrics(t *testing.T) {
sink := new(SinkMetrics)
md := testdata.GenerateMetricsOneMetric()
want := make([]pdata.Metrics, 0, 7)
for i := 0; i < 7; i++ {
require.NoError(t, sink.ConsumeMetrics(context.Background(), md))
want = append(want, md)
}
assert.Equal(t, want, sink.AllMetrics())
assert.Equal(t, len(want), sink.MetricsCount())
sink.Reset()
assert.Equal(t, 0, len(sink.AllMetrics()))
assert.Equal(t, 0, sink.MetricsCount())
}

func TestSinkMetrics_Error(t *testing.T) {
sink := new(SinkMetrics)
sink.SetConsumeError(errors.New("my error"))
md := testdata.GenerateMetricsOneMetric()
require.Error(t, sink.ConsumeMetrics(context.Background(), md))
assert.Len(t, sink.AllMetrics(), 0)
assert.Equal(t, 0, sink.MetricsCount())
}

func TestSinkLogs(t *testing.T) {
sink := new(SinkLogs)
md := testdata.GenerateLogDataOneLogNoResource()
want := make([]pdata.Logs, 0, 7)
for i := 0; i < 7; i++ {
require.NoError(t, sink.ConsumeLogs(context.Background(), md))
want = append(want, md)
}
assert.Equal(t, want, sink.AllLogs())
assert.Equal(t, len(want), sink.LogRecordsCount())
sink.Reset()
assert.Equal(t, 0, len(sink.AllLogs()))
assert.Equal(t, 0, sink.LogRecordsCount())
}

func TestSinkLogs_Error(t *testing.T) {
sink := new(SinkLogs)
sink.SetConsumeError(errors.New("my error"))
ld := testdata.GenerateLogDataOneLogNoResource()
require.Error(t, sink.ConsumeLogs(context.Background(), ld))
assert.Len(t, sink.AllLogs(), 0)
assert.Equal(t, 0, sink.LogRecordsCount())
}
3 changes: 3 additions & 0 deletions exporter/exportertest/sink_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
)

// SinkTraceExporter acts as a trace receiver for use in tests.
// Deprecated: Use consumertest.SinkTraces
type SinkTraceExporter struct {
mu sync.Mutex
consumeTraceError error // to be returned by ConsumeTraces, if set
Expand Down Expand Up @@ -92,6 +93,7 @@ func (ste *SinkTraceExporter) Shutdown(context.Context) error {
}

// SinkMetricsExporter acts as a metrics receiver for use in tests.
// Deprecated: Use consumertest.SinkMetrics
type SinkMetricsExporter struct {
mu sync.Mutex
consumeMetricsError error // to be returned by ConsumeMetrics, if set
Expand Down Expand Up @@ -159,6 +161,7 @@ func (sme *SinkMetricsExporter) Shutdown(context.Context) error {
}

// SinkLogsExporter acts as a metrics receiver for use in tests.
// Deprecated: Use consumertest.SinkLogs
type SinkLogsExporter struct {
consumeLogError error // to be returned by ConsumeLog, if set
mu sync.Mutex
Expand Down
6 changes: 3 additions & 3 deletions exporter/opencensusexporter/opencensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/internal/data/testdata"
"go.opentelemetry.io/collector/receiver/opencensusreceiver"
"go.opentelemetry.io/collector/testutil"
)

func TestSendTraces(t *testing.T) {
sink := &exportertest.SinkTraceExporter{}
sink := new(consumertest.SinkTraces)
rFactory := opencensusreceiver.NewFactory()
rCfg := rFactory.CreateDefaultConfig().(*opencensusreceiver.Config)
endpoint := testutil.GetAvailableLocalAddress(t)
Expand Down Expand Up @@ -133,7 +133,7 @@ func TestSendTraces_AfterStop(t *testing.T) {
}

func TestSendMetrics(t *testing.T) {
sink := &exportertest.SinkMetricsExporter{}
sink := new(consumertest.SinkMetrics)
rFactory := opencensusreceiver.NewFactory()
rCfg := rFactory.CreateDefaultConfig().(*opencensusreceiver.Config)
endpoint := testutil.GetAvailableLocalAddress(t)
Expand Down
Loading

0 comments on commit 96297d6

Please sign in to comment.