Skip to content

Commit

Permalink
Add connector validation to otelcol (open-telemetry#7001)
Browse files Browse the repository at this point in the history
* Add connector validation to otelcol

* Update otelcol/config.go

Co-authored-by: Juraci Paixão Kröhling <juraci.github@kroehling.de>

* Update error message for id collisions

Co-authored-by: Juraci Paixão Kröhling <juraci.github@kroehling.de>
  • Loading branch information
djaglowski and jpkrohling authored Jan 24, 2023
1 parent 1bfb180 commit 0d9c455
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 4 deletions.
2 changes: 2 additions & 0 deletions otelcol/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/otelcol/internal/grpclog"
Expand Down Expand Up @@ -165,6 +166,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
Receivers: receiver.NewBuilder(cfg.Receivers, col.set.Factories.Receivers),
Processors: processor.NewBuilder(cfg.Processors, col.set.Factories.Processors),
Exporters: exporter.NewBuilder(cfg.Exporters, col.set.Factories.Exporters),
Connectors: connector.NewBuilder(cfg.Connectors, col.set.Factories.Connectors),
Extensions: extension.NewBuilder(cfg.Extensions, col.set.Factories.Extensions),
AsyncErrorChannel: col.asyncErrorChannel,
LoggingOptions: col.set.LoggingOptions,
Expand Down
49 changes: 45 additions & 4 deletions otelcol/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,20 @@ func (cfg *Config) Validate() error {
}
}

// Validate the connector configuration.
for connID, connCfg := range cfg.Connectors {
if err := component.ValidateConfig(connCfg); err != nil {
return fmt.Errorf("connectors::%s: %w", connID, err)
}

if _, ok := cfg.Exporters[connID]; ok {
return fmt.Errorf("connectors::%s: there's already an exporter named %q", connID, connID)
}
if _, ok := cfg.Receivers[connID]; ok {
return fmt.Errorf("connectors::%s: there's already a receiver named %q", connID, connID)
}
}

// Validate the extension configuration.
for extID, extCfg := range cfg.Extensions {
if err := component.ValidateConfig(extCfg); err != nil {
Expand All @@ -124,14 +138,24 @@ func (cfg *Config) Validate() error {
}
}

// Keep track of whether connectors are used as receivers and exporters
connectorsAsReceivers := make(map[component.ID]struct{}, len(cfg.Connectors))
connectorsAsExporters := make(map[component.ID]struct{}, len(cfg.Connectors))

// Check that all pipelines reference only configured components.
for pipelineID, pipeline := range cfg.Service.Pipelines {
// Validate pipeline receiver name references.
for _, ref := range pipeline.Receivers {
// Check that the name referenced in the pipeline's receivers exists in the top-level receivers.
if cfg.Receivers[ref] == nil {
return fmt.Errorf("service::pipeline::%s: references receiver %q which is not configured", pipelineID, ref)
if _, ok := cfg.Receivers[ref]; ok {
continue
}

if _, ok := cfg.Connectors[ref]; ok {
connectorsAsReceivers[ref] = struct{}{}
continue
}
return fmt.Errorf("service::pipeline::%s: references receiver %q which is not configured", pipelineID, ref)
}

// Validate pipeline processor name references.
Expand All @@ -145,9 +169,26 @@ func (cfg *Config) Validate() error {
// Validate pipeline exporter name references.
for _, ref := range pipeline.Exporters {
// Check that the name referenced in the pipeline's Exporters exists in the top-level Exporters.
if cfg.Exporters[ref] == nil {
return fmt.Errorf("service::pipeline::%s: references exporter %q which is not configured", pipelineID, ref)
if _, ok := cfg.Exporters[ref]; ok {
continue
}
if _, ok := cfg.Connectors[ref]; ok {
connectorsAsExporters[ref] = struct{}{}
continue
}
return fmt.Errorf("service::pipeline::%s: references exporter %q which is not configured", pipelineID, ref)
}
}

// Validate that connectors are used as both receiver and exporter
for connID := range cfg.Connectors {
_, recOK := connectorsAsReceivers[connID]
_, expOK := connectorsAsExporters[connID]
if recOK && !expOK {
return fmt.Errorf("connectors::%s: must be used as both receiver and exporter but is not used as exporter", connID)
}
if !recOK && expOK {
return fmt.Errorf("connectors::%s: must be used as both receiver and exporter but is not used as receiver", connID)
}
}

Expand Down
95 changes: 95 additions & 0 deletions otelcol/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/service"
"go.opentelemetry.io/collector/service/telemetry"
)
Expand All @@ -32,6 +34,7 @@ var (
errInvalidRecvConfig = errors.New("invalid receiver config")
errInvalidExpConfig = errors.New("invalid exporter config")
errInvalidProcConfig = errors.New("invalid processor config")
errInvalidConnConfig = errors.New("invalid connector config")
errInvalidExtConfig = errors.New("invalid extension config")
)

Expand Down Expand Up @@ -59,6 +62,14 @@ func (nc *nopProcConfig) Validate() error {
return nc.validateErr
}

type nopConnConfig struct {
validateErr error
}

func (nc *nopConnConfig) Validate() error {
return nc.validateErr
}

type nopExtConfig struct {
validateErr error
}
Expand Down Expand Up @@ -188,6 +199,83 @@ func TestConfigValidate(t *testing.T) {
},
expected: fmt.Errorf(`extensions::nop: %w`, errInvalidExtConfig),
},
{
name: "invalid-connector-config",
cfgFn: func() *Config {
cfg := generateConfig()
cfg.Connectors[component.NewIDWithName("nop", "conn")] = &nopConnConfig{
validateErr: errInvalidConnConfig,
}
return cfg
},
expected: fmt.Errorf(`connectors::nop/conn: %w`, errInvalidConnConfig),
},
{
name: "ambiguous-connector-name-as-receiver",
cfgFn: func() *Config {
cfg := generateConfig()
cfg.Receivers[component.NewID("nop/2")] = &nopRecvConfig{}
cfg.Connectors[component.NewID("nop/2")] = &nopConnConfig{}
pipe := cfg.Service.Pipelines[component.NewID("traces")]
pipe.Receivers = append(pipe.Receivers, component.NewIDWithName("nop", "2"))
pipe.Exporters = append(pipe.Exporters, component.NewIDWithName("nop", "2"))
return cfg
},
expected: errors.New(`connectors::nop/2: there's already a receiver named "nop/2"`),
},
{
name: "ambiguous-connector-name-as-exporter",
cfgFn: func() *Config {
cfg := generateConfig()
cfg.Exporters[component.NewID("nop/2")] = &nopExpConfig{}
cfg.Connectors[component.NewID("nop/2")] = &nopConnConfig{}
pipe := cfg.Service.Pipelines[component.NewID("traces")]
pipe.Receivers = append(pipe.Receivers, component.NewIDWithName("nop", "2"))
pipe.Exporters = append(pipe.Exporters, component.NewIDWithName("nop", "2"))
return cfg
},
expected: errors.New(`connectors::nop/2: there's already an exporter named "nop/2"`),
},
{
name: "invalid-connector-reference-as-receiver",
cfgFn: func() *Config {
cfg := generateConfig()
pipe := cfg.Service.Pipelines[component.NewID("traces")]
pipe.Receivers = append(pipe.Receivers, component.NewIDWithName("nop", "conn2"))
return cfg
},
expected: errors.New(`service::pipeline::traces: references receiver "nop/conn2" which is not configured`),
},
{
name: "invalid-connector-reference-as-receiver",
cfgFn: func() *Config {
cfg := generateConfig()
pipe := cfg.Service.Pipelines[component.NewID("traces")]
pipe.Exporters = append(pipe.Exporters, component.NewIDWithName("nop", "conn2"))
return cfg
},
expected: errors.New(`service::pipeline::traces: references exporter "nop/conn2" which is not configured`),
},
{
name: "missing-connector-as-receiver",
cfgFn: func() *Config {
cfg := generateConfig()
pipe := cfg.Service.Pipelines[component.NewID("traces")]
pipe.Exporters = append(pipe.Exporters, component.NewIDWithName("nop", "conn"))
return cfg
},
expected: errors.New(`connectors::nop/conn: must be used as both receiver and exporter but is not used as receiver`),
},
{
name: "missing-connector-as-exporter",
cfgFn: func() *Config {
cfg := generateConfig()
pipe := cfg.Service.Pipelines[component.NewID("traces")]
pipe.Receivers = append(pipe.Receivers, component.NewIDWithName("nop", "conn"))
return cfg
},
expected: errors.New(`connectors::nop/conn: must be used as both receiver and exporter but is not used as exporter`),
},
{
name: "invalid-service-config",
cfgFn: func() *Config {
Expand All @@ -199,6 +287,10 @@ func TestConfigValidate(t *testing.T) {
},
}

require.NoError(t, featuregate.GlobalRegistry().Apply(map[string]bool{connectorsFeatureGateID: true}))
defer func() {
require.NoError(t, featuregate.GlobalRegistry().Apply(map[string]bool{connectorsFeatureGateID: false}))
}()
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
cfg := test.cfgFn()
Expand All @@ -218,6 +310,9 @@ func generateConfig() *Config {
Processors: map[component.ID]component.Config{
component.NewID("nop"): &nopProcConfig{},
},
Connectors: map[component.ID]component.Config{
component.NewIDWithName("nop", "conn"): &nopConnConfig{},
},
Extensions: map[component.ID]component.Config{
component.NewID("nop"): &nopExtConfig{},
},
Expand Down
5 changes: 5 additions & 0 deletions otelcol/otelcoltest/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/service"
)

Expand Down Expand Up @@ -70,6 +71,10 @@ func TestLoadConfigAndValidate(t *testing.T) {
factories, err := NopFactories()
assert.NoError(t, err)

require.NoError(t, featuregate.GlobalRegistry().Apply(map[string]bool{"otelcol.enableConnectors": true}))
defer func() {
require.NoError(t, featuregate.GlobalRegistry().Apply(map[string]bool{"otelcol.enableConnectors": false}))
}()
cfgValidate, errValidate := LoadConfigAndValidate(filepath.Join("testdata", "config.yaml"), factories)
require.NoError(t, errValidate)

Expand Down
3 changes: 3 additions & 0 deletions otelcol/otelcoltest/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ exporters:
nop:
nop/myexporter:

connectors:
nop/myconnector:

extensions:
nop:
nop/myextension:
Expand Down

0 comments on commit 0d9c455

Please sign in to comment.