From 50217418ae19014a4af9491076b62fb1a9ab4dfe Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Mon, 31 Oct 2022 14:26:28 -0400 Subject: [PATCH] Test graph.go, fix bug with shared receivers/exporters --- service/internal/pipelines/graph.go | 34 +- service/internal/pipelines/graph_test.go | 349 ++++++++++++++++++ service/internal/pipelines/pipelines_test.go | 2 +- .../testdata/pipelines_simple_multi_proc.yaml | 7 +- 4 files changed, 384 insertions(+), 8 deletions(-) create mode 100644 service/internal/pipelines/graph_test.go diff --git a/service/internal/pipelines/graph.go b/service/internal/pipelines/graph.go index f6c6490918d8..e709cf4b82b9 100644 --- a/service/internal/pipelines/graph.go +++ b/service/internal/pipelines/graph.go @@ -167,8 +167,8 @@ func (g *pipelinesGraph) addReceiver( ) error { receiverNodeID := newReceiverNodeID(pipelineID.Type(), recvID) - // If already created a node for this [DataType, ComponentID] nothing to do. - if g.componentGraph.Node(receiverNodeID.ID()) != nil { + if rcvrNode := g.componentGraph.Node(receiverNodeID.ID()); rcvrNode != nil { + g.pipelineGraphs[pipelineID].addReceiver(rcvrNode) return nil } @@ -228,8 +228,8 @@ func (g *pipelinesGraph) addExporter( ) error { exporterNodeID := newExporterNodeID(pipelineID.Type(), exprID) - // If already created a node for this [DataType, ComponentID] nothing to do. - if g.componentGraph.Node(exporterNodeID.ID()) != nil { + if expNode := g.componentGraph.Node(exporterNodeID.ID()); expNode != nil { + g.pipelineGraphs[pipelineID].addExporter(expNode) return nil } @@ -432,6 +432,32 @@ func (g *pipelinesGraph) GetExporters() map[config.DataType]map[config.Component return exportersMap } +func (g *pipelinesGraph) getReceivers() map[config.DataType]map[config.ComponentID]component.Receiver { + receiversMap := make(map[config.DataType]map[config.ComponentID]component.Receiver) + receiversMap[config.TracesDataType] = make(map[config.ComponentID]component.Receiver) + receiversMap[config.MetricsDataType] = make(map[config.ComponentID]component.Receiver) + receiversMap[config.LogsDataType] = make(map[config.ComponentID]component.Receiver) + + for _, pg := range g.pipelineGraphs { + for _, rcvrNode := range pg.receivers { + rcvrOrConnNode := g.componentGraph.Node(rcvrNode.ID()) + rcvrNode, ok := rcvrOrConnNode.(*receiverNode) + if !ok { + continue + } + switch rcvrNode.pipelineType { + case config.TracesDataType: + receiversMap[config.TracesDataType][rcvrNode.componentID] = rcvrNode.Component + case config.MetricsDataType: + receiversMap[config.MetricsDataType][rcvrNode.componentID] = rcvrNode.Component + case config.LogsDataType: + receiversMap[config.LogsDataType][rcvrNode.componentID] = rcvrNode.Component + } + } + } + return receiversMap +} + // TODO handle Capabilities func (g *pipelinesGraph) HandleZPages(w http.ResponseWriter, r *http.Request) { diff --git a/service/internal/pipelines/graph_test.go b/service/internal/pipelines/graph_test.go new file mode 100644 index 000000000000..a7bdd64b7c9a --- /dev/null +++ b/service/internal/pipelines/graph_test.go @@ -0,0 +1,349 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipelines + +import ( + "context" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/internal/testdata" + "go.opentelemetry.io/collector/service/internal/testcomponents" +) + +func TestGraphNewPipelinesGraph(t *testing.T) { + tests := []struct { + name string + receiverIDs []config.ComponentID + processorIDs []config.ComponentID + exporterIDs []config.ComponentID + expectedRequests int + }{ + { + name: "pipelines_simple.yaml", + receiverIDs: []config.ComponentID{config.NewComponentID("examplereceiver")}, + processorIDs: []config.ComponentID{config.NewComponentID("exampleprocessor")}, + exporterIDs: []config.ComponentID{config.NewComponentID("exampleexporter")}, + expectedRequests: 1, + }, + { + name: "pipelines_simple_multi_proc.yaml", + receiverIDs: []config.ComponentID{config.NewComponentID("examplereceiver")}, + processorIDs: []config.ComponentID{config.NewComponentID("exampleprocessor"), config.NewComponentIDWithName("exampleprocessor", "1")}, + exporterIDs: []config.ComponentID{config.NewComponentID("exampleexporter")}, + expectedRequests: 1, + }, + { + name: "pipelines_simple_no_proc.yaml", + receiverIDs: []config.ComponentID{config.NewComponentID("examplereceiver")}, + exporterIDs: []config.ComponentID{config.NewComponentID("exampleexporter")}, + expectedRequests: 1, + }, + { + name: "pipelines_multi.yaml", + receiverIDs: []config.ComponentID{config.NewComponentID("examplereceiver"), config.NewComponentIDWithName("examplereceiver", "1")}, + processorIDs: []config.ComponentID{config.NewComponentID("exampleprocessor"), config.NewComponentIDWithName("exampleprocessor", "1")}, + exporterIDs: []config.ComponentID{config.NewComponentID("exampleexporter"), config.NewComponentIDWithName("exampleexporter", "1")}, + expectedRequests: 2, + }, + { + name: "pipelines_multi_no_proc.yaml", + receiverIDs: []config.ComponentID{config.NewComponentID("examplereceiver"), config.NewComponentIDWithName("examplereceiver", "1")}, + exporterIDs: []config.ComponentID{config.NewComponentID("exampleexporter"), config.NewComponentIDWithName("exampleexporter", "1")}, + expectedRequests: 2, + }, + { + name: "pipelines_exporter_multi_pipeline.yaml", + receiverIDs: []config.ComponentID{config.NewComponentID("examplereceiver")}, + exporterIDs: []config.ComponentID{config.NewComponentID("exampleexporter")}, + expectedRequests: 2, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + factories, err := testcomponents.ExampleComponents() + assert.NoError(t, err) + + cfg := loadConfig(t, filepath.Join("testdata", test.name), factories) + + // Build the pipeline + pipelinesInterface, err := NewPipelinesGraph(context.Background(), toSettings(factories, cfg)) + assert.NoError(t, err) + + pipelines, ok := pipelinesInterface.(*pipelinesGraph) + require.True(t, ok) + + assert.NoError(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + + // Verify exporters created, started and empty. + for _, expID := range test.exporterIDs { + traceExporter := pipelines.GetExporters()[config.TracesDataType][expID].(*testcomponents.ExampleExporter) + assert.True(t, traceExporter.Started) + assert.Equal(t, len(traceExporter.Traces), 0) + + // Validate metrics. + metricsExporter := pipelines.GetExporters()[config.MetricsDataType][expID].(*testcomponents.ExampleExporter) + assert.True(t, metricsExporter.Started) + assert.Zero(t, len(metricsExporter.Traces)) + + // Validate logs. + logsExporter := pipelines.GetExporters()[config.LogsDataType][expID].(*testcomponents.ExampleExporter) + assert.True(t, logsExporter.Started) + assert.Zero(t, len(logsExporter.Traces)) + } + + // Verify processors created in the given order and started. + for i, procID := range test.processorIDs { + tracesNode := pipelines.pipelineGraphs[config.NewComponentID(config.TracesDataType)].processors[i] + tracesProcessor := tracesNode.(*processorNode) + assert.Equal(t, procID, tracesProcessor.componentID) + assert.True(t, tracesProcessor.Component.(*testcomponents.ExampleProcessor).Started) + + // Validate metrics. + metricsNode := pipelines.pipelineGraphs[config.NewComponentID(config.MetricsDataType)].processors[i] + metricsProcessor := metricsNode.(*processorNode) + assert.Equal(t, procID, metricsProcessor.componentID) + assert.True(t, metricsProcessor.Component.(*testcomponents.ExampleProcessor).Started) + + // Validate logs. + logsNode := pipelines.pipelineGraphs[config.NewComponentID(config.LogsDataType)].processors[i] + logsProcessor := logsNode.(*processorNode) + assert.Equal(t, procID, logsProcessor.componentID) + assert.True(t, logsProcessor.Component.(*testcomponents.ExampleProcessor).Started) + } + + // Verify receivers created, started and send data to confirm pipelines correctly connected. + for _, recvID := range test.receiverIDs { + traceReceiver := pipelines.getReceivers()[config.TracesDataType][recvID].(*testcomponents.ExampleReceiver) + assert.True(t, traceReceiver.Started) + // Send traces. + assert.NoError(t, traceReceiver.ConsumeTraces(context.Background(), testdata.GenerateTraces(1))) + + metricsReceiver := pipelines.getReceivers()[config.MetricsDataType][recvID].(*testcomponents.ExampleReceiver) + assert.True(t, metricsReceiver.Started) + // Send metrics. + assert.NoError(t, metricsReceiver.ConsumeMetrics(context.Background(), testdata.GenerateMetrics(1))) + + logsReceiver := pipelines.getReceivers()[config.LogsDataType][recvID].(*testcomponents.ExampleReceiver) + assert.True(t, logsReceiver.Started) + // Send logs. + assert.NoError(t, logsReceiver.ConsumeLogs(context.Background(), testdata.GenerateLogs(1))) + } + + assert.NoError(t, pipelines.ShutdownAll(context.Background())) + + // Verify receivers shutdown. + for _, recvID := range test.receiverIDs { + traceReceiver := pipelines.getReceivers()[config.TracesDataType][recvID].(*testcomponents.ExampleReceiver) + assert.True(t, traceReceiver.Stopped) + + metricsReceiver := pipelines.getReceivers()[config.MetricsDataType][recvID].(*testcomponents.ExampleReceiver) + assert.True(t, metricsReceiver.Stopped) + + logsReceiver := pipelines.getReceivers()[config.LogsDataType][recvID].(*testcomponents.ExampleReceiver) + assert.True(t, logsReceiver.Stopped) + } + + // Verify processors shutdown. + for i := range test.processorIDs { + traceNode := pipelines.pipelineGraphs[config.NewComponentID(config.TracesDataType)].processors[i] + traceProcessor := traceNode.(*processorNode) + assert.True(t, traceProcessor.Component.(*testcomponents.ExampleProcessor).Stopped) + + // Validate metrics. + metricsNode := pipelines.pipelineGraphs[config.NewComponentID(config.MetricsDataType)].processors[i] + metricsProcessor := metricsNode.(*processorNode) + assert.True(t, metricsProcessor.Component.(*testcomponents.ExampleProcessor).Stopped) + + // Validate logs. + logsNode := pipelines.pipelineGraphs[config.NewComponentID(config.LogsDataType)].processors[i] + logsProcessor := logsNode.(*processorNode) + assert.True(t, logsProcessor.Component.(*testcomponents.ExampleProcessor).Stopped) + } + + // Now verify that exporters received data, and are shutdown. + for _, expID := range test.exporterIDs { + // Validate traces. + traceExporter := pipelines.GetExporters()[config.TracesDataType][expID].(*testcomponents.ExampleExporter) + require.Len(t, traceExporter.Traces, test.expectedRequests) + assert.EqualValues(t, testdata.GenerateTraces(1), traceExporter.Traces[0]) + assert.True(t, traceExporter.Stopped) + + // Validate metrics. + metricsExporter := pipelines.GetExporters()[config.MetricsDataType][expID].(*testcomponents.ExampleExporter) + require.Len(t, metricsExporter.Metrics, test.expectedRequests) + assert.EqualValues(t, testdata.GenerateMetrics(1), metricsExporter.Metrics[0]) + assert.True(t, metricsExporter.Stopped) + + // Validate logs. + logsExporter := pipelines.GetExporters()[config.LogsDataType][expID].(*testcomponents.ExampleExporter) + require.Len(t, logsExporter.Logs, test.expectedRequests) + assert.EqualValues(t, testdata.GenerateLogs(1), logsExporter.Logs[0]) + assert.True(t, logsExporter.Stopped) + } + }) + } +} + +func TestGraphBuildErrors(t *testing.T) { + nopReceiverFactory := componenttest.NewNopReceiverFactory() + nopProcessorFactory := componenttest.NewNopProcessorFactory() + nopExporterFactory := componenttest.NewNopExporterFactory() + badReceiverFactory := newBadReceiverFactory() + badProcessorFactory := newBadProcessorFactory() + badExporterFactory := newBadExporterFactory() + + tests := []struct { + configFile string + }{ + {configFile: "not_supported_exporter_logs.yaml"}, + {configFile: "not_supported_exporter_metrics.yaml"}, + {configFile: "not_supported_exporter_traces.yaml"}, + {configFile: "not_supported_processor_logs.yaml"}, + {configFile: "not_supported_processor_metrics.yaml"}, + {configFile: "not_supported_processor_traces.yaml"}, + {configFile: "not_supported_receiver_traces.yaml"}, + {configFile: "not_supported_receiver_metrics.yaml"}, + {configFile: "not_supported_receiver_traces.yaml"}, + {configFile: "unknown_exporter_config.yaml"}, + {configFile: "unknown_exporter_factory.yaml"}, + {configFile: "unknown_processor_config.yaml"}, + {configFile: "unknown_processor_factory.yaml"}, + {configFile: "unknown_receiver_config.yaml"}, + {configFile: "unknown_receiver_factory.yaml"}, + } + + for _, test := range tests { + t.Run(test.configFile, func(t *testing.T) { + factories := component.Factories{ + Receivers: map[config.Type]component.ReceiverFactory{ + nopReceiverFactory.Type(): nopReceiverFactory, + "unknown": nopReceiverFactory, + badReceiverFactory.Type(): badReceiverFactory, + }, + Processors: map[config.Type]component.ProcessorFactory{ + nopProcessorFactory.Type(): nopProcessorFactory, + "unknown": nopProcessorFactory, + badProcessorFactory.Type(): badProcessorFactory, + }, + Exporters: map[config.Type]component.ExporterFactory{ + nopExporterFactory.Type(): nopExporterFactory, + "unknown": nopExporterFactory, + badExporterFactory.Type(): badExporterFactory, + }, + } + + // Need the unknown factories to do unmarshalling. + cfg := loadConfig(t, filepath.Join("testdata", test.configFile), factories) + + // Remove the unknown factories, so they are NOT available during building. + delete(factories.Exporters, "unknown") + delete(factories.Processors, "unknown") + delete(factories.Receivers, "unknown") + + _, err := NewPipelinesGraph(context.Background(), toSettings(factories, cfg)) + assert.Error(t, err) + }) + } +} + +func TestGraphFailToStartAndShutdown(t *testing.T) { + errReceiverFactory := newErrReceiverFactory() + errProcessorFactory := newErrProcessorFactory() + errExporterFactory := newErrExporterFactory() + nopReceiverFactory := componenttest.NewNopReceiverFactory() + nopProcessorFactory := componenttest.NewNopProcessorFactory() + nopExporterFactory := componenttest.NewNopExporterFactory() + + set := Settings{ + Telemetry: componenttest.NewNopTelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + ReceiverFactories: map[config.Type]component.ReceiverFactory{ + nopReceiverFactory.Type(): nopReceiverFactory, + errReceiverFactory.Type(): errReceiverFactory, + }, + ReceiverConfigs: map[config.ComponentID]config.Receiver{ + config.NewComponentID(nopReceiverFactory.Type()): nopReceiverFactory.CreateDefaultConfig(), + config.NewComponentID(errReceiverFactory.Type()): errReceiverFactory.CreateDefaultConfig(), + }, + ProcessorFactories: map[config.Type]component.ProcessorFactory{ + nopProcessorFactory.Type(): nopProcessorFactory, + errProcessorFactory.Type(): errProcessorFactory, + }, + ProcessorConfigs: map[config.ComponentID]config.Processor{ + config.NewComponentID(nopProcessorFactory.Type()): nopProcessorFactory.CreateDefaultConfig(), + config.NewComponentID(errProcessorFactory.Type()): errProcessorFactory.CreateDefaultConfig(), + }, + ExporterFactories: map[config.Type]component.ExporterFactory{ + nopExporterFactory.Type(): nopExporterFactory, + errExporterFactory.Type(): errExporterFactory, + }, + ExporterConfigs: map[config.ComponentID]config.Exporter{ + config.NewComponentID(nopExporterFactory.Type()): nopExporterFactory.CreateDefaultConfig(), + config.NewComponentID(errExporterFactory.Type()): errExporterFactory.CreateDefaultConfig(), + }, + } + + for _, dt := range []config.DataType{config.TracesDataType, config.MetricsDataType, config.LogsDataType} { + t.Run(string(dt)+"/receiver", func(t *testing.T) { + set.PipelineConfigs = map[config.ComponentID]*config.Pipeline{ + config.NewComponentID(dt): { + Receivers: []config.ComponentID{config.NewComponentID("nop"), config.NewComponentID("err")}, + Processors: []config.ComponentID{config.NewComponentID("nop")}, + Exporters: []config.ComponentID{config.NewComponentID("nop")}, + }, + } + pipelines, err := NewPipelinesGraph(context.Background(), set) + assert.NoError(t, err) + assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + assert.Error(t, pipelines.ShutdownAll(context.Background())) + }) + + t.Run(string(dt)+"/processor", func(t *testing.T) { + set.PipelineConfigs = map[config.ComponentID]*config.Pipeline{ + config.NewComponentID(dt): { + Receivers: []config.ComponentID{config.NewComponentID("nop")}, + Processors: []config.ComponentID{config.NewComponentID("nop"), config.NewComponentID("err")}, + Exporters: []config.ComponentID{config.NewComponentID("nop")}, + }, + } + pipelines, err := NewPipelinesGraph(context.Background(), set) + assert.NoError(t, err) + assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + assert.Error(t, pipelines.ShutdownAll(context.Background())) + }) + + t.Run(string(dt)+"/exporter", func(t *testing.T) { + set.PipelineConfigs = map[config.ComponentID]*config.Pipeline{ + config.NewComponentID(dt): { + Receivers: []config.ComponentID{config.NewComponentID("nop")}, + Processors: []config.ComponentID{config.NewComponentID("nop")}, + Exporters: []config.ComponentID{config.NewComponentID("nop"), config.NewComponentID("err")}, + }, + } + pipelines, err := NewPipelinesGraph(context.Background(), set) + assert.NoError(t, err) + assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + assert.Error(t, pipelines.ShutdownAll(context.Background())) + }) + } +} diff --git a/service/internal/pipelines/pipelines_test.go b/service/internal/pipelines/pipelines_test.go index d49a8a53fdd8..badfc42c6958 100644 --- a/service/internal/pipelines/pipelines_test.go +++ b/service/internal/pipelines/pipelines_test.go @@ -53,7 +53,7 @@ func TestBuild(t *testing.T) { { name: "pipelines_simple_multi_proc.yaml", receiverIDs: []config.ComponentID{config.NewComponentID("examplereceiver")}, - processorIDs: []config.ComponentID{config.NewComponentID("exampleprocessor"), config.NewComponentID("exampleprocessor")}, + processorIDs: []config.ComponentID{config.NewComponentID("exampleprocessor"), config.NewComponentIDWithName("exampleprocessor", "1")}, exporterIDs: []config.ComponentID{config.NewComponentID("exampleexporter")}, expectedRequests: 1, }, diff --git a/service/internal/pipelines/testdata/pipelines_simple_multi_proc.yaml b/service/internal/pipelines/testdata/pipelines_simple_multi_proc.yaml index bb51c870843b..262f41ce6ab6 100644 --- a/service/internal/pipelines/testdata/pipelines_simple_multi_proc.yaml +++ b/service/internal/pipelines/testdata/pipelines_simple_multi_proc.yaml @@ -3,6 +3,7 @@ receivers: processors: exampleprocessor: + exampleprocessor/1: exporters: exampleexporter: @@ -11,15 +12,15 @@ service: pipelines: traces: receivers: [examplereceiver] - processors: [exampleprocessor, exampleprocessor] + processors: [exampleprocessor, exampleprocessor/1] exporters: [exampleexporter] metrics: receivers: [examplereceiver] - processors: [exampleprocessor, exampleprocessor] + processors: [exampleprocessor, exampleprocessor/1] exporters: [exampleexporter] logs: receivers: [examplereceiver] - processors: [exampleprocessor, exampleprocessor] + processors: [exampleprocessor, exampleprocessor/1] exporters: [exampleexporter]