diff --git a/component/componenttest/nop_connector.go b/component/componenttest/nop_connector.go new file mode 100644 index 000000000000..6afa5f84854b --- /dev/null +++ b/component/componenttest/nop_connector.go @@ -0,0 +1,97 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package componenttest // import "go.opentelemetry.io/collector/component/componenttest" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" +) + +// NewNopConnectorCreateSettings returns a new nop settings for Create*Connector functions. +func NewNopConnectorCreateSettings() component.ConnectorCreateSettings { + return component.ConnectorCreateSettings{ + TelemetrySettings: NewNopTelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + } +} + +type nopConnectorConfig struct { + config.ConnectorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct +} + +// NewNopConnectorFactory returns a component.ConnectorFactory that constructs nop processors. +func NewNopConnectorFactory() component.ConnectorFactory { + return component.NewConnectorFactory( + "nop", + func() config.Connector { + return &nopConnectorConfig{ + ConnectorSettings: config.NewConnectorSettings(config.NewComponentID("nop")), + } + }, + component.WithTracesConnector(createTracesConnector, component.StabilityLevelStable), + component.WithTracesToMetricsConnector(createTracesToMetricsConnector, component.StabilityLevelStable), + component.WithTracesToLogsConnector(createTracesToLogsConnector, component.StabilityLevelStable), + component.WithMetricsConnector(createMetricsConnector, component.StabilityLevelStable), + component.WithMetricsToTracesConnector(createMetricsToTracesConnector, component.StabilityLevelStable), + component.WithMetricsToLogsConnector(createMetricsToLogsConnector, component.StabilityLevelStable), + component.WithLogsConnector(createLogsConnector, component.StabilityLevelStable), + component.WithLogsToTracesConnector(createLogsToTracesConnector, component.StabilityLevelStable), + component.WithLogsToMetricsConnector(createLogsToMetricsConnector, component.StabilityLevelStable), + ) +} + +func createTracesConnector(context.Context, component.ConnectorCreateSettings, config.Connector, consumer.Traces) (component.TracesConnector, error) { + return nopConnectorInstance, nil +} +func createTracesToMetricsConnector(context.Context, component.ConnectorCreateSettings, config.Connector, consumer.Metrics) (component.TracesToMetricsConnector, error) { + return nopConnectorInstance, nil +} +func createTracesToLogsConnector(context.Context, component.ConnectorCreateSettings, config.Connector, consumer.Logs) (component.TracesToLogsConnector, error) { + return nopConnectorInstance, nil +} + +func createMetricsConnector(context.Context, component.ConnectorCreateSettings, config.Connector, consumer.Metrics) (component.MetricsConnector, error) { + return nopConnectorInstance, nil +} +func createMetricsToTracesConnector(context.Context, component.ConnectorCreateSettings, config.Connector, consumer.Traces) (component.MetricsToTracesConnector, error) { + return nopConnectorInstance, nil +} +func createMetricsToLogsConnector(context.Context, component.ConnectorCreateSettings, config.Connector, consumer.Logs) (component.MetricsToLogsConnector, error) { + return nopConnectorInstance, nil +} + +func createLogsConnector(context.Context, component.ConnectorCreateSettings, config.Connector, consumer.Logs) (component.LogsConnector, error) { + return nopConnectorInstance, nil +} +func createLogsToTracesConnector(context.Context, component.ConnectorCreateSettings, config.Connector, consumer.Traces) (component.LogsToTracesConnector, error) { + return nopConnectorInstance, nil +} +func createLogsToMetricsConnector(context.Context, component.ConnectorCreateSettings, config.Connector, consumer.Metrics) (component.LogsToMetricsConnector, error) { + return nopConnectorInstance, nil +} + +var nopConnectorInstance = &nopConnector{ + Consumer: consumertest.NewNop(), +} + +// nopConnector stores consumed traces and metrics for testing purposes. +type nopConnector struct { + nopComponent + consumertest.Consumer +} diff --git a/component/componenttest/nop_connector_test.go b/component/componenttest/nop_connector_test.go new file mode 100644 index 000000000000..d5796881525e --- /dev/null +++ b/component/componenttest/nop_connector_test.go @@ -0,0 +1,91 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package componenttest + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +func TestNewNopConnectorFactory(t *testing.T) { + factory := NewNopConnectorFactory() + require.NotNil(t, factory) + assert.Equal(t, config.Type("nop"), factory.Type()) + cfg := factory.CreateDefaultConfig() + assert.Equal(t, &nopConnectorConfig{ConnectorSettings: config.NewConnectorSettings(config.NewComponentID("nop"))}, cfg) + + traces, err := factory.CreateTracesConnector(context.Background(), NewNopConnectorCreateSettings(), cfg, consumertest.NewNop()) + require.NoError(t, err) + assert.NoError(t, traces.Start(context.Background(), NewNopHost())) + assert.NoError(t, traces.ConsumeTraces(context.Background(), ptrace.NewTraces())) + assert.NoError(t, traces.Shutdown(context.Background())) + + tracesToMetrics, err := factory.CreateTracesToMetricsConnector(context.Background(), NewNopConnectorCreateSettings(), cfg, consumertest.NewNop()) + require.NoError(t, err) + assert.NoError(t, tracesToMetrics.Start(context.Background(), NewNopHost())) + assert.NoError(t, tracesToMetrics.ConsumeTraces(context.Background(), ptrace.NewTraces())) + assert.NoError(t, tracesToMetrics.Shutdown(context.Background())) + + tracesToLogs, err := factory.CreateTracesToLogsConnector(context.Background(), NewNopConnectorCreateSettings(), cfg, consumertest.NewNop()) + require.NoError(t, err) + assert.NoError(t, tracesToLogs.Start(context.Background(), NewNopHost())) + assert.NoError(t, tracesToLogs.ConsumeTraces(context.Background(), ptrace.NewTraces())) + assert.NoError(t, tracesToLogs.Shutdown(context.Background())) + + metrics, err := factory.CreateMetricsConnector(context.Background(), NewNopConnectorCreateSettings(), cfg, consumertest.NewNop()) + require.NoError(t, err) + assert.NoError(t, metrics.Start(context.Background(), NewNopHost())) + assert.NoError(t, metrics.ConsumeMetrics(context.Background(), pmetric.NewMetrics())) + assert.NoError(t, metrics.Shutdown(context.Background())) + + metricsToTraces, err := factory.CreateMetricsToTracesConnector(context.Background(), NewNopConnectorCreateSettings(), cfg, consumertest.NewNop()) + require.NoError(t, err) + assert.NoError(t, metricsToTraces.Start(context.Background(), NewNopHost())) + assert.NoError(t, metricsToTraces.ConsumeMetrics(context.Background(), pmetric.NewMetrics())) + assert.NoError(t, metricsToTraces.Shutdown(context.Background())) + + metricsToLogs, err := factory.CreateMetricsToLogsConnector(context.Background(), NewNopConnectorCreateSettings(), cfg, consumertest.NewNop()) + require.NoError(t, err) + assert.NoError(t, metricsToLogs.Start(context.Background(), NewNopHost())) + assert.NoError(t, metricsToLogs.ConsumeMetrics(context.Background(), pmetric.NewMetrics())) + assert.NoError(t, metricsToLogs.Shutdown(context.Background())) + + logs, err := factory.CreateLogsConnector(context.Background(), NewNopConnectorCreateSettings(), cfg, consumertest.NewNop()) + require.NoError(t, err) + assert.NoError(t, logs.Start(context.Background(), NewNopHost())) + assert.NoError(t, logs.ConsumeLogs(context.Background(), plog.NewLogs())) + assert.NoError(t, logs.Shutdown(context.Background())) + + logsToTraces, err := factory.CreateLogsToTracesConnector(context.Background(), NewNopConnectorCreateSettings(), cfg, consumertest.NewNop()) + require.NoError(t, err) + assert.NoError(t, logsToTraces.Start(context.Background(), NewNopHost())) + assert.NoError(t, logsToTraces.ConsumeLogs(context.Background(), plog.NewLogs())) + assert.NoError(t, logsToTraces.Shutdown(context.Background())) + + logsToMetrics, err := factory.CreateLogsToMetricsConnector(context.Background(), NewNopConnectorCreateSettings(), cfg, consumertest.NewNop()) + require.NoError(t, err) + assert.NoError(t, logsToMetrics.Start(context.Background(), NewNopHost())) + assert.NoError(t, logsToMetrics.ConsumeLogs(context.Background(), plog.NewLogs())) + assert.NoError(t, logsToMetrics.Shutdown(context.Background())) +} diff --git a/component/connector.go b/component/connector.go index 37d21ed3f52e..533100840659 100644 --- a/component/connector.go +++ b/component/connector.go @@ -35,6 +35,7 @@ type Connector interface { // then process and export the trace to the appropriate backend. type TracesConnector interface { Connector + consumer.Traces } // A TracesToMetricsConnector acts as an exporter from a traces pipeline and a receiver to a metrics pipeline. @@ -44,6 +45,7 @@ type TracesConnector interface { // For example traces could be summarized by a metrics connector that emits statistics describing the traces observed. type TracesToMetricsConnector interface { Connector + consumer.Traces } // A TracesToLogsConnector acts as an exporter from a traces pipeline and a receiver to a logs pipeline. @@ -53,6 +55,7 @@ type TracesToMetricsConnector interface { // For example traces could be analyzed by a logs connector that emits events when particular criteria are met. type TracesToLogsConnector interface { Connector + consumer.Traces } // A MetricsConnector sends metrics from one pipeline to another. @@ -64,6 +67,7 @@ type TracesToLogsConnector interface { // then process and export the metric to the appropriate backend. type MetricsConnector interface { Connector + consumer.Metrics } // A MetricsToTracesConnector acts as an exporter from a metrics pipeline and a receiver to a traces pipeline. @@ -73,6 +77,7 @@ type MetricsConnector interface { // For example latency between related data points could be modeled and emitted as traces. type MetricsToTracesConnector interface { Connector + consumer.Metrics } // A MetricsToLogsConnector acts as an exporter from a metrics pipeline and a receiver to a logs pipeline. @@ -82,6 +87,7 @@ type MetricsToTracesConnector interface { // For example metrics could be analyzed by a logs connector that emits events when particular criteria are met. type MetricsToLogsConnector interface { Connector + consumer.Metrics } // A LogsConnector sends logs from one pipeline to another. @@ -93,6 +99,7 @@ type MetricsToLogsConnector interface { // then process and export the log to the appropriate backend. type LogsConnector interface { Connector + consumer.Logs } // A LogsToTracesConnector acts as an exporter from a logs pipeline and a receiver to a traces pipeline. @@ -102,6 +109,7 @@ type LogsConnector interface { // For example structured logs containing span information could be consumed and emitted as traces. type LogsToTracesConnector interface { Connector + consumer.Logs } // A LogsToMetricsConnector acts as an exporter from a logs pipeline and a receiver to a metrics pipeline. @@ -111,6 +119,7 @@ type LogsToTracesConnector interface { // For example metrics could be extracted from structured logs that contain numeric data. type LogsToMetricsConnector interface { Connector + consumer.Logs } // ConnectorCreateSettings configures Connector creators. diff --git a/service/config_provider_test.go b/service/config_provider_test.go index a6b7189975ba..ae09efaaaf54 100644 --- a/service/config_provider_test.go +++ b/service/config_provider_test.go @@ -53,6 +53,7 @@ var configNop = &Config{ Processors: map[config.ComponentID]config.Processor{config.NewComponentID("nop"): componenttest.NewNopProcessorFactory().CreateDefaultConfig()}, Exporters: map[config.ComponentID]config.Exporter{config.NewComponentID("nop"): componenttest.NewNopExporterFactory().CreateDefaultConfig()}, Extensions: map[config.ComponentID]config.Extension{config.NewComponentID("nop"): componenttest.NewNopExtensionFactory().CreateDefaultConfig()}, + Connectors: map[config.ComponentID]config.Connector{}, Service: ConfigService{ Extensions: []config.ComponentID{config.NewComponentID("nop")}, Pipelines: map[config.ComponentID]*ConfigServicePipeline{ diff --git a/service/internal/pipelines/graph.go b/service/internal/pipelines/graph.go index 2330b357a6c1..aa6a07684eba 100644 --- a/service/internal/pipelines/graph.go +++ b/service/internal/pipelines/graph.go @@ -69,13 +69,27 @@ func (g *pipelinesGraph) ShutdownAll(ctx context.Context) error { func (g *pipelinesGraph) GetExporters() map[config.DataType]map[config.ComponentID]component.Exporter { exportersMap := make(map[config.DataType]map[config.ComponentID]component.Exporter) - exportersMap[config.TracesDataType] = make(map[config.ComponentID]component.Exporter) exportersMap[config.MetricsDataType] = make(map[config.ComponentID]component.Exporter) exportersMap[config.LogsDataType] = make(map[config.ComponentID]component.Exporter) - // TODO - + for _, pg := range g.pipelineGraphs { + for _, expNode := range pg.exporters { + expOrConnNode := g.componentGraph.Node(expNode.ID()) + expNode, ok := expOrConnNode.(*exporterNode) + if !ok { + continue + } + switch expNode.pipelineType { + case config.TracesDataType: + exportersMap[config.TracesDataType][expNode.componentID] = expNode.Component + case config.MetricsDataType: + exportersMap[config.MetricsDataType][expNode.componentID] = expNode.Component + case config.LogsDataType: + exportersMap[config.LogsDataType][expNode.componentID] = expNode.Component + } + } + } return exportersMap } diff --git a/service/testdata/otelcol-nop.yaml b/service/testdata/otelcol-nop.yaml index 6780b937e13b..96373d9f2c0d 100644 --- a/service/testdata/otelcol-nop.yaml +++ b/service/testdata/otelcol-nop.yaml @@ -27,4 +27,4 @@ service: logs: receivers: [nop] processors: [nop] - exporters: [nop] + exporters: [nop] \ No newline at end of file