Skip to content

Commit

Permalink
Extract service.pipelines interface, add skeleton graph implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Dec 27, 2022
1 parent f83774b commit a24a76e
Show file tree
Hide file tree
Showing 20 changed files with 625 additions and 441 deletions.
1 change: 1 addition & 0 deletions cmd/otelcorecol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ require (
go.uber.org/zap v1.24.0 // indirect
golang.org/x/net v0.4.0 // indirect
golang.org/x/text v0.5.0 // indirect
gonum.org/v1/gonum v0.12.0 // indirect
google.golang.org/genproto v0.0.0-20221027153422-115e99e71e1c // indirect
google.golang.org/grpc v1.51.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
Expand Down
3 changes: 3 additions & 0 deletions cmd/otelcorecol/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ golang.org/x/exp v0.0.0-20191129062945-2f5052295587/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 h1:QE6XYQK6naiK1EPAe1g/ILLxN5RBoH5xkJk3CqlMI/Y=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
Expand Down Expand Up @@ -679,6 +680,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gonum.org/v1/gonum v0.12.0 h1:xKuo6hzt+gMav00meVPUlXwSdoEJP46BR+wdxQEFK2o=
gonum.org/v1/gonum v0.12.0/go.mod h1:73TDxJfAAHeA8Mk9mf8NlIppyhQNo5GLTcYeqgo2lvY=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg=
Expand Down
176 changes: 175 additions & 1 deletion connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -85,7 +86,10 @@ type Logs interface {

// CreateSettings configures Connector creators.
type CreateSettings struct {
TelemetrySettings component.TelemetrySettings
// ID returns the ID of the component that will be created.
ID component.ID

component.TelemetrySettings

// BuildInfo can be used by components for informational purposes
BuildInfo component.BuildInfo
Expand Down Expand Up @@ -453,3 +457,173 @@ func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefa
}
return f
}

// Builder processor is a helper struct that given a set of Configs and Factories helps with creating processors.
type Builder struct {
cfgs map[component.ID]component.Config
factories map[component.Type]Factory
}

// NewBuilder creates a new connector.Builder to help with creating components form a set of configs and factories.
func NewBuilder(cfgs map[component.ID]component.Config, factories map[component.Type]Factory) *Builder {
return &Builder{cfgs: cfgs, factories: factories}
}

// CreateTracesToTraces creates a Traces connector based on the settings and config.
func (b *Builder) CreateTracesToTraces(ctx context.Context, set CreateSettings, next consumer.Traces) (Traces, error) {
cfg, existsCfg := b.cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("connector %q is not configured", set.ID)
}

f, existsFactory := b.factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("connector factory not available for: %q", set.ID)
}

logStabilityLevel(set.Logger, f.TracesToTracesStability())
return f.CreateTracesToTraces(ctx, set, cfg, next)
}

// CreateTracesToMetrics creates a Traces connector based on the settings and config.
func (b *Builder) CreateTracesToMetrics(ctx context.Context, set CreateSettings, next consumer.Metrics) (Traces, error) {
cfg, existsCfg := b.cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("connector %q is not configured", set.ID)
}

f, existsFactory := b.factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("connector factory not available for: %q", set.ID)
}

logStabilityLevel(set.Logger, f.TracesToMetricsStability())
return f.CreateTracesToMetrics(ctx, set, cfg, next)
}

// CreateTracesToLogs creates a Traces connector based on the settings and config.
func (b *Builder) CreateTracesToLogs(ctx context.Context, set CreateSettings, next consumer.Logs) (Traces, error) {
cfg, existsCfg := b.cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("connector %q is not configured", set.ID)
}

f, existsFactory := b.factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("connector factory not available for: %q", set.ID)
}

logStabilityLevel(set.Logger, f.TracesToLogsStability())
return f.CreateTracesToLogs(ctx, set, cfg, next)
}

// CreateMetricsToTraces creates a Metrics connector based on the settings and config.
func (b *Builder) CreateMetricsToTraces(ctx context.Context, set CreateSettings, next consumer.Traces) (Metrics, error) {
cfg, existsCfg := b.cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("connector %q is not configured", set.ID)
}

f, existsFactory := b.factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("connector factory not available for: %q", set.ID)
}

logStabilityLevel(set.Logger, f.MetricsToTracesStability())
return f.CreateMetricsToTraces(ctx, set, cfg, next)
}

// CreateMetricsToMetrics creates a Metrics connector based on the settings and config.
func (b *Builder) CreateMetricsToMetrics(ctx context.Context, set CreateSettings, next consumer.Metrics) (Metrics, error) {
cfg, existsCfg := b.cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("connector %q is not configured", set.ID)
}

f, existsFactory := b.factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("connector factory not available for: %q", set.ID)
}

logStabilityLevel(set.Logger, f.MetricsToMetricsStability())
return f.CreateMetricsToMetrics(ctx, set, cfg, next)
}

// CreateMetricsToLogs creates a Metrics connector based on the settings and config.
func (b *Builder) CreateMetricsToLogs(ctx context.Context, set CreateSettings, next consumer.Logs) (Metrics, error) {
cfg, existsCfg := b.cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("connector %q is not configured", set.ID)
}

f, existsFactory := b.factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("connector factory not available for: %q", set.ID)
}

logStabilityLevel(set.Logger, f.MetricsToLogsStability())
return f.CreateMetricsToLogs(ctx, set, cfg, next)
}

// CreateLogsToTraces creates a Logs connector based on the settings and config.
func (b *Builder) CreateLogsToTraces(ctx context.Context, set CreateSettings, next consumer.Traces) (Logs, error) {
cfg, existsCfg := b.cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("connector %q is not configured", set.ID)
}

f, existsFactory := b.factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("connector factory not available for: %q", set.ID)
}

logStabilityLevel(set.Logger, f.LogsToTracesStability())
return f.CreateLogsToTraces(ctx, set, cfg, next)
}

// CreateLogsToMetrics creates a Logs connector based on the settings and config.
func (b *Builder) CreateLogsToMetrics(ctx context.Context, set CreateSettings, next consumer.Metrics) (Logs, error) {
cfg, existsCfg := b.cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("connector %q is not configured", set.ID)
}

f, existsFactory := b.factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("connector factory not available for: %q", set.ID)
}

logStabilityLevel(set.Logger, f.LogsToMetricsStability())
return f.CreateLogsToMetrics(ctx, set, cfg, next)
}

// CreateLogsToLogs creates a Logs connector based on the settings and config.
func (b *Builder) CreateLogsToLogs(ctx context.Context, set CreateSettings, next consumer.Logs) (Logs, error) {
cfg, existsCfg := b.cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("connector %q is not configured", set.ID)
}

f, existsFactory := b.factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("connector factory not available for: %q", set.ID)
}

logStabilityLevel(set.Logger, f.LogsToLogsStability())
return f.CreateLogsToLogs(ctx, set, cfg, next)
}

func (b *Builder) Factory(componentType component.Type) component.Factory {
return b.factories[componentType]
}

// logStabilityLevel logs the stability level of a component. The log level is set to info for
// undefined, unmaintained, deprecated and development. The log level is set to debug
// for alpha, beta and stable.
func logStabilityLevel(logger *zap.Logger, sl component.StabilityLevel) {
if sl >= component.StabilityLevelAlpha {
logger.Debug(sl.LogMessage())
} else {
logger.Info(sl.LogMessage())
}
}
10 changes: 10 additions & 0 deletions connector/connectortest/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"go.opentelemetry.io/collector/consumer/consumertest"
)

const typeStr = "nop"

// NewNopCreateSettings returns a new nop settings for Create* functions.
func NewNopCreateSettings() connector.CreateSettings {
return connector.CreateSettings{
Expand Down Expand Up @@ -95,3 +97,11 @@ type nopConnector struct {
component.ShutdownFunc
consumertest.Consumer
}

// NewNopBuilder returns a connector.Builder that constructs nop receivers.
func NewNopBuilder() *connector.Builder {
nopFactory := NewNopFactory()
return connector.NewBuilder(
map[component.ID]component.Config{component.NewID(typeStr): nopFactory.CreateDefaultConfig()},
map[component.Type]connector.Factory{typeStr: nopFactory})
}
64 changes: 64 additions & 0 deletions connector/connectortest/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,67 @@ func TestNewNopConnectorFactory(t *testing.T) {
assert.NoError(t, logsToLogs.ConsumeLogs(context.Background(), plog.NewLogs()))
assert.NoError(t, logsToLogs.Shutdown(context.Background()))
}

func TestNewNopBuilder(t *testing.T) {
builder := NewNopBuilder()
require.NotNil(t, builder)

factory := NewNopFactory()
cfg := factory.CreateDefaultConfig()
set := NewNopCreateSettings()
set.ID = component.NewID(typeStr)

tracesToTraces, err := factory.CreateTracesToTraces(context.Background(), set, cfg, consumertest.NewNop())
require.NoError(t, err)
bTracesToTraces, err := builder.CreateTracesToTraces(context.Background(), set, consumertest.NewNop())
require.NoError(t, err)
assert.IsType(t, tracesToTraces, bTracesToTraces)

tracesToMetrics, err := factory.CreateTracesToMetrics(context.Background(), set, cfg, consumertest.NewNop())
require.NoError(t, err)
bTracesToMetrics, err := builder.CreateTracesToMetrics(context.Background(), set, consumertest.NewNop())
require.NoError(t, err)
assert.IsType(t, tracesToMetrics, bTracesToMetrics)

tracesToLogs, err := factory.CreateTracesToLogs(context.Background(), set, cfg, consumertest.NewNop())
require.NoError(t, err)
bTracesToLogs, err := builder.CreateTracesToLogs(context.Background(), set, consumertest.NewNop())
require.NoError(t, err)
assert.IsType(t, tracesToLogs, bTracesToLogs)

metricsToTraces, err := factory.CreateMetricsToTraces(context.Background(), set, cfg, consumertest.NewNop())
require.NoError(t, err)
bMetricsToTraces, err := builder.CreateMetricsToTraces(context.Background(), set, consumertest.NewNop())
require.NoError(t, err)
assert.IsType(t, metricsToTraces, bMetricsToTraces)

metricsToMetrics, err := factory.CreateMetricsToMetrics(context.Background(), set, cfg, consumertest.NewNop())
require.NoError(t, err)
bMetricsToMetrics, err := builder.CreateMetricsToMetrics(context.Background(), set, consumertest.NewNop())
require.NoError(t, err)
assert.IsType(t, metricsToMetrics, bMetricsToMetrics)

metricsToLogs, err := factory.CreateMetricsToLogs(context.Background(), set, cfg, consumertest.NewNop())
require.NoError(t, err)
bMetricsToLogs, err := builder.CreateMetricsToLogs(context.Background(), set, consumertest.NewNop())
require.NoError(t, err)
assert.IsType(t, metricsToLogs, bMetricsToLogs)

logsToTraces, err := factory.CreateLogsToTraces(context.Background(), set, cfg, consumertest.NewNop())
require.NoError(t, err)
bLogsToTraces, err := builder.CreateLogsToTraces(context.Background(), set, consumertest.NewNop())
require.NoError(t, err)
assert.IsType(t, logsToTraces, bLogsToTraces)

logsToMetrics, err := factory.CreateLogsToMetrics(context.Background(), set, cfg, consumertest.NewNop())
require.NoError(t, err)
bLogsToMetrics, err := builder.CreateLogsToMetrics(context.Background(), set, consumertest.NewNop())
require.NoError(t, err)
assert.IsType(t, logsToMetrics, bLogsToMetrics)

logsToLogs, err := factory.CreateLogsToLogs(context.Background(), set, cfg, consumertest.NewNop())
require.NoError(t, err)
bLogsToLogs, err := builder.CreateLogsToLogs(context.Background(), set, consumertest.NewNop())
require.NoError(t, err)
assert.IsType(t, logsToLogs, bLogsToLogs)
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ require (
go.uber.org/zap v1.24.0
golang.org/x/net v0.4.0
golang.org/x/sys v0.3.0
gonum.org/v1/gonum v0.12.0
google.golang.org/grpc v1.51.0
gopkg.in/yaml.v3 v3.0.1
)
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ golang.org/x/exp v0.0.0-20191129062945-2f5052295587/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 h1:QE6XYQK6naiK1EPAe1g/ILLxN5RBoH5xkJk3CqlMI/Y=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
Expand Down Expand Up @@ -684,6 +685,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gonum.org/v1/gonum v0.12.0 h1:xKuo6hzt+gMav00meVPUlXwSdoEJP46BR+wdxQEFK2o=
gonum.org/v1/gonum v0.12.0/go.mod h1:73TDxJfAAHeA8Mk9mf8NlIppyhQNo5GLTcYeqgo2lvY=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg=
Expand Down
Loading

0 comments on commit a24a76e

Please sign in to comment.