Skip to content

Commit

Permalink
Prototype "connectors", a new type of pipeline component
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Nov 29, 2022
1 parent 4ff1ff3 commit 67b2867
Show file tree
Hide file tree
Showing 98 changed files with 6,414 additions and 89 deletions.
22 changes: 22 additions & 0 deletions .chloggen/connectors-prototype-v2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: connectors

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add "connectors", a new type of pipeline component

# One or more tracking issues or pull requests related to the change
issues: [2336]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
- Connectors connect pipelines. A connector acts as exporter and a receiver working together.
For example, a `nopconnector`` may be used to replicate signals by exporting data from one
pipeline and receiving the data on two other pipelines, which can each process a copy of the data
in different ways.
Connectors can also derive one signal from another. For example, a `countconnector` can be used as
an exporter on a logs pipeline and emit metrics, describing the number of logs, onto a metrics pipeline.
8 changes: 8 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ updates:
directory: "/component"
schedule:
interval: "weekly"
- package-ecosystem: "gomod"
directory: "/connector/countconnector"
schedule:
interval: "weekly"
- package-ecosystem: "gomod"
directory: "/connector/nopconnector"
schedule:
interval: "weekly"
- package-ecosystem: "gomod"
directory: "/consumer"
schedule:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
bin/
dist/
local/

# GoLand IDEA
/.idea/
Expand Down
14 changes: 13 additions & 1 deletion cmd/builder/internal/builder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Config struct {
Extensions []Module `mapstructure:"extensions"`
Receivers []Module `mapstructure:"receivers"`
Processors []Module `mapstructure:"processors"`
Connectors []Module `mapstructure:"connectors"`
Replaces []string `mapstructure:"replaces"`
Excludes []string `mapstructure:"excludes"`
}
Expand Down Expand Up @@ -90,7 +91,13 @@ func NewDefaultConfig() Config {

// Validate checks whether the current configuration is valid
func (c *Config) Validate() error {
return multierr.Combine(validateModules(c.Extensions), validateModules(c.Receivers), validateModules(c.Exporters), validateModules(c.Processors))
return multierr.Combine(
validateModules(c.Extensions),
validateModules(c.Receivers),
validateModules(c.Exporters),
validateModules(c.Processors),
validateModules(c.Connectors),
)
}

// SetGoPath sets go path
Expand Down Expand Up @@ -133,6 +140,11 @@ func (c *Config) ParseModules() error {
return err
}

c.Connectors, err = parseModules(c.Connectors)
if err != nil {
return err
}

return nil
}

Expand Down
12 changes: 12 additions & 0 deletions cmd/builder/internal/builder/templates/components.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ package main

import (
"go.opentelemetry.io/collector/component"
{{- range .Connectors}}
{{.Name}} "{{.Import}}"
{{- end}}
{{- range .Exporters}}
{{.Name}} "{{.Import}}"
{{- end}}
Expand Down Expand Up @@ -31,6 +34,15 @@ func components() (component.Factories, error) {
return component.Factories{}, err
}

factories.Connectors, err = component.MakeConnectorFactoryMap(
{{- range .Connectors}}
{{.Name}}.NewFactory(),
{{- end}}
)
if err != nil {
return component.Factories{}, err
}

factories.Receivers, err = component.MakeReceiverFactoryMap(
{{- range .Receivers}}
{{.Name}}.NewFactory(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,7 @@ func TestValidateConfigs(t *testing.T) {
for _, factory := range factories.Extensions {
assert.NoError(t, componenttest.CheckConfigStruct(factory.CreateDefaultConfig()))
}
for _, factory := range factories.Connectors {
assert.NoError(t, componenttest.CheckConfigStruct(factory.CreateDefaultConfig()))
}
}
7 changes: 7 additions & 0 deletions cmd/builder/internal/builder/templates/go.mod.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ require (
{{- range .Processors}}
{{if .GoMod}}{{.GoMod}}{{end}}
{{- end}}
{{- range .Connectors}}
{{if .GoMod}}{{.GoMod}}{{end}}
{{- end}}
go.opentelemetry.io/collector v{{.Distribution.OtelColVersion}}
)

Expand All @@ -32,6 +35,10 @@ require (
{{- range .Processors}}
{{if ne .Path ""}}replace {{.GoMod}} => {{.Path}}{{end}}
{{- end}}
{{- range .Connectors}}
{{if ne .Path ""}}replace {{.GoMod}} => {{.Path}}{{end}}
{{- end}}

{{- range .Replaces}}
replace {{.}}
{{- end}}
Expand Down
1 change: 1 addition & 0 deletions cmd/builder/internal/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func applyCfgFromFile(flags *flag.FlagSet, cfgFromFile builder.Config) {
cfg.Extensions = cfgFromFile.Extensions
cfg.Receivers = cfgFromFile.Receivers
cfg.Processors = cfgFromFile.Processors
cfg.Connectors = cfgFromFile.Connectors
cfg.Replaces = cfgFromFile.Replaces
cfg.Excludes = cfgFromFile.Excludes

Expand Down
5 changes: 5 additions & 0 deletions cmd/builder/internal/config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,9 @@ extensions:
processors:
- gomod: go.opentelemetry.io/collector/processor/batchprocessor v0.65.0
- gomod: go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.65.0
connectors:
- import: go.opentelemetry.io/collector/connector/countconnector
gomod: go.opentelemetry.io/collector/connector/countconnector v0.0.0
- import: go.opentelemetry.io/collector/connector/nopconnector
gomod: go.opentelemetry.io/collector/connector/nopconnector v0.0.0

7 changes: 7 additions & 0 deletions cmd/otelcorecol/builder-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,18 @@ extensions:
processors:
- gomod: go.opentelemetry.io/collector/processor/batchprocessor v0.65.0
- gomod: go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.65.0
connectors:
- import: go.opentelemetry.io/collector/connector/countconnector
gomod: go.opentelemetry.io/collector/connector/countconnector v0.0.0
- import: go.opentelemetry.io/collector/connector/nopconnector
gomod: go.opentelemetry.io/collector/connector/nopconnector v0.0.0

replaces:
- go.opentelemetry.io/collector => ../../
- go.opentelemetry.io/collector/component => ../../component
- go.opentelemetry.io/collector/consumer => ../../consumer
- go.opentelemetry.io/collector/connector/countconnector => ../../connector/countconnector
- go.opentelemetry.io/collector/connector/nopconnector => ../../connector/nopconnector
- go.opentelemetry.io/collector/exporter/loggingexporter => ../../exporter/loggingexporter
- go.opentelemetry.io/collector/exporter/otlpexporter => ../../exporter/otlpexporter
- go.opentelemetry.io/collector/exporter/otlphttpexporter => ../../exporter/otlphttpexporter
Expand Down
10 changes: 10 additions & 0 deletions cmd/otelcorecol/components.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions cmd/otelcorecol/components_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions cmd/otelcorecol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ require (
github.com/stretchr/testify v1.8.1
go.opentelemetry.io/collector v0.65.0
go.opentelemetry.io/collector/component v0.65.0
go.opentelemetry.io/collector/connector/countconnector v0.0.0
go.opentelemetry.io/collector/connector/nopconnector v0.0.0
go.opentelemetry.io/collector/exporter/loggingexporter v0.65.0
go.opentelemetry.io/collector/exporter/otlpexporter v0.65.0
go.opentelemetry.io/collector/exporter/otlphttpexporter v0.65.0
Expand Down Expand Up @@ -82,6 +84,7 @@ require (
go.uber.org/zap v1.23.0 // indirect
golang.org/x/net v0.0.0-20221014081412-f15817d10f9b // indirect
golang.org/x/text v0.4.0 // indirect
gonum.org/v1/gonum v0.12.0 // indirect
google.golang.org/genproto v0.0.0-20221027153422-115e99e71e1c // indirect
google.golang.org/grpc v1.51.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
Expand All @@ -95,6 +98,10 @@ replace go.opentelemetry.io/collector/component => ../../component

replace go.opentelemetry.io/collector/consumer => ../../consumer

replace go.opentelemetry.io/collector/connector/countconnector => ../../connector/countconnector

replace go.opentelemetry.io/collector/connector/nopconnector => ../../connector/nopconnector

replace go.opentelemetry.io/collector/exporter/loggingexporter => ../../exporter/loggingexporter

replace go.opentelemetry.io/collector/exporter/otlpexporter => ../../exporter/otlpexporter
Expand Down
3 changes: 3 additions & 0 deletions cmd/otelcorecol/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ golang.org/x/exp v0.0.0-20191129062945-2f5052295587/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 h1:QE6XYQK6naiK1EPAe1g/ILLxN5RBoH5xkJk3CqlMI/Y=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
Expand Down Expand Up @@ -674,6 +675,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gonum.org/v1/gonum v0.12.0 h1:xKuo6hzt+gMav00meVPUlXwSdoEJP46BR+wdxQEFK2o=
gonum.org/v1/gonum v0.12.0/go.mod h1:73TDxJfAAHeA8Mk9mf8NlIppyhQNo5GLTcYeqgo2lvY=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg=
Expand Down
130 changes: 130 additions & 0 deletions component/componenttest/nop_connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package componenttest // import "go.opentelemetry.io/collector/component/componenttest"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

// NewNopConnectorCreateSettings returns a new nop settings for Create*Connector functions.
func NewNopConnectorCreateSettings() component.ConnectorCreateSettings {
return component.ConnectorCreateSettings{
TelemetrySettings: NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
}
}

type nopConnectorConfig struct {
config.ConnectorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
}

// NewNopConnectorFactory returns a component.ConnectorFactory that constructs nop processors.
func NewNopConnectorFactory() component.ConnectorFactory {
return component.NewConnectorFactory(
"nop",
func() component.Config {
return &nopConnectorConfig{
ConnectorSettings: config.NewConnectorSettings(component.NewID("nop")),
}
},
component.WithTracesToTracesConnector(createTracesToTracesConnector, component.StabilityLevelStable),
component.WithTracesToMetricsConnector(createTracesToMetricsConnector, component.StabilityLevelStable),
component.WithTracesToLogsConnector(createTracesToLogsConnector, component.StabilityLevelStable),
component.WithMetricsToTracesConnector(createMetricsToTracesConnector, component.StabilityLevelStable),
component.WithMetricsToMetricsConnector(createMetricsToMetricsConnector, component.StabilityLevelStable),
component.WithMetricsToLogsConnector(createMetricsToLogsConnector, component.StabilityLevelStable),
component.WithLogsToTracesConnector(createLogsToTracesConnector, component.StabilityLevelStable),
component.WithLogsToMetricsConnector(createLogsToMetricsConnector, component.StabilityLevelStable),
component.WithLogsToLogsConnector(createLogsToLogsConnector, component.StabilityLevelStable),
)
}

func createTracesToTracesConnector(context.Context, component.ConnectorCreateSettings, component.Config, consumer.Traces) (component.TracesToTracesConnector, error) {
return nopConnectorInstance, nil
}
func createTracesToMetricsConnector(context.Context, component.ConnectorCreateSettings, component.Config, consumer.Metrics) (component.TracesToMetricsConnector, error) {
return nopConnectorInstance, nil
}
func createTracesToLogsConnector(context.Context, component.ConnectorCreateSettings, component.Config, consumer.Logs) (component.TracesToLogsConnector, error) {
return nopConnectorInstance, nil
}

func createMetricsToTracesConnector(context.Context, component.ConnectorCreateSettings, component.Config, consumer.Traces) (component.MetricsToTracesConnector, error) {
return nopConnectorInstance, nil
}
func createMetricsToMetricsConnector(context.Context, component.ConnectorCreateSettings, component.Config, consumer.Metrics) (component.MetricsToMetricsConnector, error) {
return nopConnectorInstance, nil
}
func createMetricsToLogsConnector(context.Context, component.ConnectorCreateSettings, component.Config, consumer.Logs) (component.MetricsToLogsConnector, error) {
return nopConnectorInstance, nil
}

func createLogsToTracesConnector(context.Context, component.ConnectorCreateSettings, component.Config, consumer.Traces) (component.LogsToTracesConnector, error) {
return nopConnectorInstance, nil
}
func createLogsToMetricsConnector(context.Context, component.ConnectorCreateSettings, component.Config, consumer.Metrics) (component.LogsToMetricsConnector, error) {
return nopConnectorInstance, nil
}
func createLogsToLogsConnector(context.Context, component.ConnectorCreateSettings, component.Config, consumer.Logs) (component.LogsToLogsConnector, error) {
return nopConnectorInstance, nil
}

var nopConnectorInstance = &nopConnector{
Consumer: consumertest.NewNop(),
}

// nopConnector stores consumed traces and metrics for testing purposes.
type nopConnector struct {
nopComponent
consumertest.Consumer
}

func (c *nopConnector) ConsumeTracesToTraces(ctx context.Context, td ptrace.Traces) error {
return nil
}
func (c *nopConnector) ConsumeTracesToMetrics(ctx context.Context, td ptrace.Traces) error {
return nil
}
func (c *nopConnector) ConsumeTracesToLogs(ctx context.Context, td ptrace.Traces) error {
return nil
}

func (c *nopConnector) ConsumeMetricsToTraces(ctx context.Context, md pmetric.Metrics) error {
return nil
}
func (c *nopConnector) ConsumeMetricsToMetrics(ctx context.Context, md pmetric.Metrics) error {
return nil
}
func (c *nopConnector) ConsumeMetricsToLogs(ctx context.Context, md pmetric.Metrics) error {
return nil
}

func (c *nopConnector) ConsumeLogsToTraces(ctx context.Context, ld plog.Logs) error {
return nil
}
func (c *nopConnector) ConsumeLogsToMetrics(ctx context.Context, ld plog.Logs) error {
return nil
}
func (c *nopConnector) ConsumeLogsToLogs(ctx context.Context, ld plog.Logs) error {
return nil
}
Loading

0 comments on commit 67b2867

Please sign in to comment.