diff --git a/otelcol/collector.go b/otelcol/collector.go index f695637da130..8db4e3ed0b63 100644 --- a/otelcol/collector.go +++ b/otelcol/collector.go @@ -29,6 +29,7 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/otelcol/internal/grpclog" @@ -165,6 +166,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { Receivers: receiver.NewBuilder(cfg.Receivers, col.set.Factories.Receivers), Processors: processor.NewBuilder(cfg.Processors, col.set.Factories.Processors), Exporters: exporter.NewBuilder(cfg.Exporters, col.set.Factories.Exporters), + Connectors: connector.NewBuilder(cfg.Connectors, col.set.Factories.Connectors), Extensions: extension.NewBuilder(cfg.Extensions, col.set.Factories.Extensions), AsyncErrorChannel: col.asyncErrorChannel, LoggingOptions: col.set.LoggingOptions, diff --git a/otelcol/config.go b/otelcol/config.go index 1d78ee5c23f4..0d3d50796a78 100644 --- a/otelcol/config.go +++ b/otelcol/config.go @@ -101,6 +101,20 @@ func (cfg *Config) Validate() error { } } + // Validate the connector configuration. + for connID, connCfg := range cfg.Connectors { + if err := component.ValidateConfig(connCfg); err != nil { + return fmt.Errorf("connectors::%s: %w", connID, err) + } + + if _, ok := cfg.Exporters[connID]; ok { + return fmt.Errorf("connectors::%s: there's already an exporter named %q", connID, connID) + } + if _, ok := cfg.Receivers[connID]; ok { + return fmt.Errorf("connectors::%s: there's already a receiver named %q", connID, connID) + } + } + // Validate the extension configuration. for extID, extCfg := range cfg.Extensions { if err := component.ValidateConfig(extCfg); err != nil { @@ -124,14 +138,24 @@ func (cfg *Config) Validate() error { } } + // Keep track of whether connectors are used as receivers and exporters + connectorsAsReceivers := make(map[component.ID]struct{}, len(cfg.Connectors)) + connectorsAsExporters := make(map[component.ID]struct{}, len(cfg.Connectors)) + // Check that all pipelines reference only configured components. for pipelineID, pipeline := range cfg.Service.Pipelines { // Validate pipeline receiver name references. for _, ref := range pipeline.Receivers { // Check that the name referenced in the pipeline's receivers exists in the top-level receivers. - if cfg.Receivers[ref] == nil { - return fmt.Errorf("service::pipeline::%s: references receiver %q which is not configured", pipelineID, ref) + if _, ok := cfg.Receivers[ref]; ok { + continue } + + if _, ok := cfg.Connectors[ref]; ok { + connectorsAsReceivers[ref] = struct{}{} + continue + } + return fmt.Errorf("service::pipeline::%s: references receiver %q which is not configured", pipelineID, ref) } // Validate pipeline processor name references. @@ -145,9 +169,26 @@ func (cfg *Config) Validate() error { // Validate pipeline exporter name references. for _, ref := range pipeline.Exporters { // Check that the name referenced in the pipeline's Exporters exists in the top-level Exporters. - if cfg.Exporters[ref] == nil { - return fmt.Errorf("service::pipeline::%s: references exporter %q which is not configured", pipelineID, ref) + if _, ok := cfg.Exporters[ref]; ok { + continue } + if _, ok := cfg.Connectors[ref]; ok { + connectorsAsExporters[ref] = struct{}{} + continue + } + return fmt.Errorf("service::pipeline::%s: references exporter %q which is not configured", pipelineID, ref) + } + } + + // Validate that connectors are used as both receiver and exporter + for connID := range cfg.Connectors { + _, recOK := connectorsAsReceivers[connID] + _, expOK := connectorsAsExporters[connID] + if recOK && !expOK { + return fmt.Errorf("connectors::%s: must be used as both receiver and exporter but is not used as exporter", connID) + } + if !recOK && expOK { + return fmt.Errorf("connectors::%s: must be used as both receiver and exporter but is not used as receiver", connID) } } diff --git a/otelcol/config_test.go b/otelcol/config_test.go index 8185ac3d8c33..2a030d5b81e5 100644 --- a/otelcol/config_test.go +++ b/otelcol/config_test.go @@ -20,10 +20,12 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/service" "go.opentelemetry.io/collector/service/telemetry" ) @@ -32,6 +34,7 @@ var ( errInvalidRecvConfig = errors.New("invalid receiver config") errInvalidExpConfig = errors.New("invalid exporter config") errInvalidProcConfig = errors.New("invalid processor config") + errInvalidConnConfig = errors.New("invalid connector config") errInvalidExtConfig = errors.New("invalid extension config") ) @@ -59,6 +62,14 @@ func (nc *nopProcConfig) Validate() error { return nc.validateErr } +type nopConnConfig struct { + validateErr error +} + +func (nc *nopConnConfig) Validate() error { + return nc.validateErr +} + type nopExtConfig struct { validateErr error } @@ -188,6 +199,83 @@ func TestConfigValidate(t *testing.T) { }, expected: fmt.Errorf(`extensions::nop: %w`, errInvalidExtConfig), }, + { + name: "invalid-connector-config", + cfgFn: func() *Config { + cfg := generateConfig() + cfg.Connectors[component.NewIDWithName("nop", "conn")] = &nopConnConfig{ + validateErr: errInvalidConnConfig, + } + return cfg + }, + expected: fmt.Errorf(`connectors::nop/conn: %w`, errInvalidConnConfig), + }, + { + name: "ambiguous-connector-name-as-receiver", + cfgFn: func() *Config { + cfg := generateConfig() + cfg.Receivers[component.NewID("nop/2")] = &nopRecvConfig{} + cfg.Connectors[component.NewID("nop/2")] = &nopConnConfig{} + pipe := cfg.Service.Pipelines[component.NewID("traces")] + pipe.Receivers = append(pipe.Receivers, component.NewIDWithName("nop", "2")) + pipe.Exporters = append(pipe.Exporters, component.NewIDWithName("nop", "2")) + return cfg + }, + expected: errors.New(`connectors::nop/2: there's already a receiver named "nop/2"`), + }, + { + name: "ambiguous-connector-name-as-exporter", + cfgFn: func() *Config { + cfg := generateConfig() + cfg.Exporters[component.NewID("nop/2")] = &nopExpConfig{} + cfg.Connectors[component.NewID("nop/2")] = &nopConnConfig{} + pipe := cfg.Service.Pipelines[component.NewID("traces")] + pipe.Receivers = append(pipe.Receivers, component.NewIDWithName("nop", "2")) + pipe.Exporters = append(pipe.Exporters, component.NewIDWithName("nop", "2")) + return cfg + }, + expected: errors.New(`connectors::nop/2: there's already an exporter named "nop/2"`), + }, + { + name: "invalid-connector-reference-as-receiver", + cfgFn: func() *Config { + cfg := generateConfig() + pipe := cfg.Service.Pipelines[component.NewID("traces")] + pipe.Receivers = append(pipe.Receivers, component.NewIDWithName("nop", "conn2")) + return cfg + }, + expected: errors.New(`service::pipeline::traces: references receiver "nop/conn2" which is not configured`), + }, + { + name: "invalid-connector-reference-as-receiver", + cfgFn: func() *Config { + cfg := generateConfig() + pipe := cfg.Service.Pipelines[component.NewID("traces")] + pipe.Exporters = append(pipe.Exporters, component.NewIDWithName("nop", "conn2")) + return cfg + }, + expected: errors.New(`service::pipeline::traces: references exporter "nop/conn2" which is not configured`), + }, + { + name: "missing-connector-as-receiver", + cfgFn: func() *Config { + cfg := generateConfig() + pipe := cfg.Service.Pipelines[component.NewID("traces")] + pipe.Exporters = append(pipe.Exporters, component.NewIDWithName("nop", "conn")) + return cfg + }, + expected: errors.New(`connectors::nop/conn: must be used as both receiver and exporter but is not used as receiver`), + }, + { + name: "missing-connector-as-exporter", + cfgFn: func() *Config { + cfg := generateConfig() + pipe := cfg.Service.Pipelines[component.NewID("traces")] + pipe.Receivers = append(pipe.Receivers, component.NewIDWithName("nop", "conn")) + return cfg + }, + expected: errors.New(`connectors::nop/conn: must be used as both receiver and exporter but is not used as exporter`), + }, { name: "invalid-service-config", cfgFn: func() *Config { @@ -199,6 +287,10 @@ func TestConfigValidate(t *testing.T) { }, } + require.NoError(t, featuregate.GlobalRegistry().Apply(map[string]bool{connectorsFeatureGateID: true})) + defer func() { + require.NoError(t, featuregate.GlobalRegistry().Apply(map[string]bool{connectorsFeatureGateID: false})) + }() for _, test := range testCases { t.Run(test.name, func(t *testing.T) { cfg := test.cfgFn() @@ -218,6 +310,9 @@ func generateConfig() *Config { Processors: map[component.ID]component.Config{ component.NewID("nop"): &nopProcConfig{}, }, + Connectors: map[component.ID]component.Config{ + component.NewIDWithName("nop", "conn"): &nopConnConfig{}, + }, Extensions: map[component.ID]component.Config{ component.NewID("nop"): &nopExtConfig{}, }, diff --git a/otelcol/otelcoltest/config_test.go b/otelcol/otelcoltest/config_test.go index 317cc7de8122..81a6b85d029e 100644 --- a/otelcol/otelcoltest/config_test.go +++ b/otelcol/otelcoltest/config_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/service" ) @@ -70,6 +71,10 @@ func TestLoadConfigAndValidate(t *testing.T) { factories, err := NopFactories() assert.NoError(t, err) + require.NoError(t, featuregate.GlobalRegistry().Apply(map[string]bool{"otelcol.enableConnectors": true})) + defer func() { + require.NoError(t, featuregate.GlobalRegistry().Apply(map[string]bool{"otelcol.enableConnectors": false})) + }() cfgValidate, errValidate := LoadConfigAndValidate(filepath.Join("testdata", "config.yaml"), factories) require.NoError(t, errValidate) diff --git a/otelcol/otelcoltest/testdata/config.yaml b/otelcol/otelcoltest/testdata/config.yaml index 38227d7a68bc..d9e21c55369c 100644 --- a/otelcol/otelcoltest/testdata/config.yaml +++ b/otelcol/otelcoltest/testdata/config.yaml @@ -10,6 +10,9 @@ exporters: nop: nop/myexporter: +connectors: + nop/myconnector: + extensions: nop: nop/myextension: