diff --git a/cmd/otelcontribcol/processors_test.go b/cmd/otelcontribcol/processors_test.go index a640c815bedb..a8138b46a342 100644 --- a/cmd/otelcontribcol/processors_test.go +++ b/cmd/otelcontribcol/processors_test.go @@ -22,8 +22,9 @@ import ( "go.opentelemetry.io/collector/processor/processortest" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/attraction" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor" - remoteobserverprocessor "github.com/open-telemetry/opentelemetry-collector-contrib/processor/remoteobserverprocessor" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/remoteobserverprocessor" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanprocessor" ) @@ -183,25 +184,51 @@ func verifyProcessorLifecycle(t *testing.T, factory processor.Factory, getConfig getConfigFn = factory.CreateDefaultConfig } - createFns := []createProcessorFn{ - wrapCreateLogsProc(factory), - wrapCreateTracesProc(factory), - wrapCreateMetricsProc(factory), + createFns := map[component.DataType]createProcessorFn{ + component.DataTypeLogs: wrapCreateLogsProc(factory), + component.DataTypeTraces: wrapCreateTracesProc(factory), + component.DataTypeMetrics: wrapCreateMetricsProc(factory), } - for _, createFn := range createFns { - firstExp, err := createFn(ctx, processorCreationSet, getConfigFn()) - if errors.Is(err, component.ErrDataTypeIsNotSupported) { - continue + for i := 0; i < 2; i++ { + procs := make(map[component.DataType]component.Component) + for dataType, createFn := range createFns { + proc, err := createFn(ctx, processorCreationSet, getConfigFn()) + if errors.Is(err, component.ErrDataTypeIsNotSupported) { + continue + } + require.NoError(t, err) + procs[dataType] = proc + require.NoError(t, proc.Start(ctx, host)) + } + for dataType, proc := range procs { + assert.NotPanics(t, func() { + switch dataType { + case component.DataTypeLogs: + logsProc := proc.(processor.Logs) + logs := testdata.GenerateLogsManyLogRecordsSameResource(2) + if !logsProc.Capabilities().MutatesData { + logs.MarkReadOnly() + } + assert.NoError(t, logsProc.ConsumeLogs(ctx, logs)) + case component.DataTypeMetrics: + metricsProc := proc.(processor.Metrics) + metrics := testdata.GenerateMetricsTwoMetrics() + if !metricsProc.Capabilities().MutatesData { + metrics.MarkReadOnly() + } + assert.NoError(t, metricsProc.ConsumeMetrics(ctx, metrics)) + case component.DataTypeTraces: + tracesProc := proc.(processor.Traces) + traces := testdata.GenerateTracesTwoSpansSameResource() + if !tracesProc.Capabilities().MutatesData { + traces.MarkReadOnly() + } + assert.NoError(t, tracesProc.ConsumeTraces(ctx, traces)) + } + }) + require.NoError(t, proc.Shutdown(ctx)) } - require.NoError(t, err) - require.NoError(t, firstExp.Start(ctx, host)) - require.NoError(t, firstExp.Shutdown(ctx)) - - secondExp, err := createFn(ctx, processorCreationSet, getConfigFn()) - require.NoError(t, err) - require.NoError(t, secondExp.Start(ctx, host)) - require.NoError(t, secondExp.Shutdown(ctx)) } }