Skip to content

Commit

Permalink
[chore] cleaning up test to use mdatagen test harness (#11098)
Browse files Browse the repository at this point in the history
Follow up for
#10910 (comment),
ping @djaglowski as the author of the PR

Signed-off-by: Alex Boten <223565+codeboten@users.noreply.github.com>
  • Loading branch information
codeboten authored Sep 10, 2024
1 parent 936ffbb commit 340e775
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 159 deletions.
81 changes: 30 additions & 51 deletions processor/processorhelper/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,15 @@ package processorhelper
import (
"context"
"errors"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/plog"
Expand Down Expand Up @@ -91,60 +86,44 @@ func TestLogsProcessor_RecordInOut(t *testing.T) {
incomingLogRecords.AppendEmpty()
incomingLogRecords.AppendEmpty()

metricReader := sdkmetric.NewManualReader()
set := processortest.NewNopSettings()
set.TelemetrySettings.MetricsLevel = configtelemetry.LevelBasic
set.TelemetrySettings.LeveledMeterProvider = func(level configtelemetry.Level) metric.MeterProvider {
if level >= configtelemetry.LevelBasic {
return sdkmetric.NewMeterProvider(sdkmetric.WithReader(metricReader))
}
return nil
}

lp, err := NewLogsProcessor(context.Background(), set, &testLogsCfg, consumertest.NewNop(), mockAggregate)
testTelemetry := setupTestTelemetry()
lp, err := NewLogsProcessor(context.Background(), testTelemetry.NewSettings(), &testLogsCfg, consumertest.NewNop(), mockAggregate)
require.NoError(t, err)

assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, lp.ConsumeLogs(context.Background(), incomingLogs))
assert.NoError(t, lp.Shutdown(context.Background()))

ownMetrics := new(metricdata.ResourceMetrics)
require.NoError(t, metricReader.Collect(context.Background(), ownMetrics))

require.Len(t, ownMetrics.ScopeMetrics, 1)
require.Len(t, ownMetrics.ScopeMetrics[0].Metrics, 2)

inMetric := ownMetrics.ScopeMetrics[0].Metrics[0]
outMetric := ownMetrics.ScopeMetrics[0].Metrics[1]
if strings.Contains(inMetric.Name, "outgoing") {
inMetric, outMetric = outMetric, inMetric
}

metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.KeyValue{
Key: attribute.Key("processor"),
Value: attribute.StringValue(set.ID.String()),
}),
Value: 3,
testTelemetry.assertMetrics(t, []metricdata.Metrics{
{
Name: "otelcol_processor_incoming_log_records",
Description: "Number of log records passed to the processor.",
Unit: "{records}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 3,
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper")),
},
},
},
},
}, inMetric.Data, metricdatatest.IgnoreTimestamp())

metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.KeyValue{
Key: attribute.Key("processor"),
Value: attribute.StringValue(set.ID.String()),
}),
Value: 1,
{
Name: "otelcol_processor_outgoing_log_records",
Description: "Number of log records emitted from the processor.",
Unit: "{records}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper")),
},
},
},
},
}, outMetric.Data, metricdatatest.IgnoreTimestamp())
})
}
89 changes: 34 additions & 55 deletions processor/processorhelper/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,15 @@ package processorhelper
import (
"context"
"errors"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand Down Expand Up @@ -76,76 +71,60 @@ func newTestMProcessor(retError error) ProcessMetricsFunc {
}

func TestMetricsProcessor_RecordInOut(t *testing.T) {
// Regardless of how many data points are ingested, emit just one
// Regardless of how many data points are ingested, emit 3
mockAggregate := func(_ context.Context, _ pmetric.Metrics) (pmetric.Metrics, error) {
md := pmetric.NewMetrics()
md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty()
md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty()
md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty()
return md, nil
}

incomingMetrics := pmetric.NewMetrics()
dps := incomingMetrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints()

// Add 3 data points to the incoming
dps.AppendEmpty()
// Add 2 data points to the incoming
dps.AppendEmpty()
dps.AppendEmpty()

metricReader := sdkmetric.NewManualReader()
set := processortest.NewNopSettings()
set.TelemetrySettings.MetricsLevel = configtelemetry.LevelNormal
set.TelemetrySettings.MetricsLevel = configtelemetry.LevelBasic
set.TelemetrySettings.LeveledMeterProvider = func(level configtelemetry.Level) metric.MeterProvider {
if level >= configtelemetry.LevelBasic {
return sdkmetric.NewMeterProvider(sdkmetric.WithReader(metricReader))
}
return nil
}

mp, err := NewMetricsProcessor(context.Background(), set, &testMetricsCfg, consumertest.NewNop(), mockAggregate)
testTelemetry := setupTestTelemetry()
mp, err := NewMetricsProcessor(context.Background(), testTelemetry.NewSettings(), &testMetricsCfg, consumertest.NewNop(), mockAggregate)
require.NoError(t, err)

assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, mp.ConsumeMetrics(context.Background(), incomingMetrics))
assert.NoError(t, mp.Shutdown(context.Background()))

ownMetrics := new(metricdata.ResourceMetrics)
require.NoError(t, metricReader.Collect(context.Background(), ownMetrics))

require.Len(t, ownMetrics.ScopeMetrics, 1)
require.Len(t, ownMetrics.ScopeMetrics[0].Metrics, 2)

inMetric := ownMetrics.ScopeMetrics[0].Metrics[0]
outMetric := ownMetrics.ScopeMetrics[0].Metrics[1]
if strings.Contains(inMetric.Name, "outgoing") {
inMetric, outMetric = outMetric, inMetric
}

metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.KeyValue{
Key: attribute.Key("processor"),
Value: attribute.StringValue(set.ID.String()),
}),
Value: 3,
testTelemetry.assertMetrics(t, []metricdata.Metrics{
{
Name: "otelcol_processor_incoming_metric_points",
Description: "Number of metric points passed to the processor.",
Unit: "{datapoints}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 2,
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper")),
},
},
},
},
}, inMetric.Data, metricdatatest.IgnoreTimestamp())

metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.KeyValue{
Key: attribute.Key("processor"),
Value: attribute.StringValue(set.ID.String()),
}),
Value: 1,
{
Name: "otelcol_processor_outgoing_metric_points",
Description: "Number of metric points emitted from the processor.",
Unit: "{datapoints}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 3,
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper")),
},
},
},
},
}, outMetric.Data, metricdatatest.IgnoreTimestamp())
})
}
85 changes: 32 additions & 53 deletions processor/processorhelper/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,15 @@ package processorhelper
import (
"context"
"errors"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand Down Expand Up @@ -86,66 +81,50 @@ func TestTracesProcessor_RecordInOut(t *testing.T) {
incomingTraces := ptrace.NewTraces()
incomingSpans := incomingTraces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans()

// Add 3 records to the incoming
// Add 4 records to the incoming
incomingSpans.AppendEmpty()
incomingSpans.AppendEmpty()
incomingSpans.AppendEmpty()
incomingSpans.AppendEmpty()

metricReader := sdkmetric.NewManualReader()
set := processortest.NewNopSettings()
set.TelemetrySettings.MetricsLevel = configtelemetry.LevelNormal
set.TelemetrySettings.MetricsLevel = configtelemetry.LevelBasic
set.TelemetrySettings.LeveledMeterProvider = func(level configtelemetry.Level) metric.MeterProvider {
if level >= configtelemetry.LevelBasic {
return sdkmetric.NewMeterProvider(sdkmetric.WithReader(metricReader))
}
return nil
}

tp, err := NewTracesProcessor(context.Background(), set, &testLogsCfg, consumertest.NewNop(), mockAggregate)
testTelemetry := setupTestTelemetry()
tp, err := NewTracesProcessor(context.Background(), testTelemetry.NewSettings(), &testLogsCfg, consumertest.NewNop(), mockAggregate)
require.NoError(t, err)

assert.NoError(t, tp.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, tp.ConsumeTraces(context.Background(), incomingTraces))
assert.NoError(t, tp.Shutdown(context.Background()))

ownMetrics := new(metricdata.ResourceMetrics)
require.NoError(t, metricReader.Collect(context.Background(), ownMetrics))

require.Len(t, ownMetrics.ScopeMetrics, 1)
require.Len(t, ownMetrics.ScopeMetrics[0].Metrics, 2)

inMetric := ownMetrics.ScopeMetrics[0].Metrics[0]
outMetric := ownMetrics.ScopeMetrics[0].Metrics[1]
if strings.Contains(inMetric.Name, "outgoing") {
inMetric, outMetric = outMetric, inMetric
}

metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.KeyValue{
Key: attribute.Key("processor"),
Value: attribute.StringValue(set.ID.String()),
}),
Value: 3,
testTelemetry.assertMetrics(t, []metricdata.Metrics{
{
Name: "otelcol_processor_incoming_spans",
Description: "Number of spans passed to the processor.",
Unit: "{spans}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 4,
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper")),
},
},
},
},
}, inMetric.Data, metricdatatest.IgnoreTimestamp())

metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.KeyValue{
Key: attribute.Key("processor"),
Value: attribute.StringValue(set.ID.String()),
}),
Value: 1,
{
Name: "otelcol_processor_outgoing_spans",
Description: "Number of spans emitted from the processor.",
Unit: "{spans}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper")),
},
},
},
},
}, outMetric.Data, metricdatatest.IgnoreTimestamp())
})
}

0 comments on commit 340e775

Please sign in to comment.