From 0ac0bb85eb7c08a7abfede7431dfab0827e0df11 Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Wed, 8 Jan 2025 11:41:26 -0600 Subject: [PATCH 1/3] Add singleton flags to factories --- connector/connector.go | 21 +++++++++++++++++++++ connector/connector_test.go | 7 ++++++- connector/xconnector/connector.go | 7 +++++++ connector/xconnector/connector_test.go | 5 +++++ exporter/exporter.go | 20 ++++++++++++++++++++ exporter/exporter_test.go | 6 +++++- exporter/xexporter/exporter.go | 7 +++++++ receiver/otlpreceiver/factory.go | 1 + receiver/receiver.go | 20 ++++++++++++++++++++ receiver/receiver_test.go | 6 +++++- receiver/xreceiver/profiles.go | 7 +++++++ 11 files changed, 104 insertions(+), 3 deletions(-) diff --git a/connector/connector.go b/connector/connector.go index 2ae78f26a6b..64a038d2a95 100644 --- a/connector/connector.go +++ b/connector/connector.go @@ -112,9 +112,17 @@ type Factory interface { LogsToMetricsStability() component.StabilityLevel LogsToLogsStability() component.StabilityLevel + // Metadata returns the metadata describing the receiver. + Metadata() Metadata + unexportedFactoryFunc() } +// Metadata contains metadata describing the component that is created by the factory. +type Metadata struct { + SingletonInstance bool +} + // FactoryOption applies changes to Factory. type FactoryOption interface { // apply applies the option. @@ -301,6 +309,13 @@ func WithLogsToLogs(createLogsToLogs CreateLogsToLogsFunc, sl component.Stabilit }) } +// AsSingletonInstance indicates that the factory always returns the same instance of the component. +func AsSingletonInstance() FactoryOption { + return factoryOptionFunc(func(o *factory) { + o.metadata.SingletonInstance = true + }) +} + // factory implements the Factory interface. type factory struct { cfgType component.Type @@ -329,6 +344,8 @@ type factory struct { logsToTracesStabilityLevel component.StabilityLevel logsToMetricsStabilityLevel component.StabilityLevel logsToLogsStabilityLevel component.StabilityLevel + + metadata Metadata } // Type returns the type of component. @@ -374,6 +391,10 @@ func (f *factory) LogsToLogsStability() component.StabilityLevel { return f.logsToLogsStabilityLevel } +func (f *factory) Metadata() Metadata { + return f.metadata +} + // NewFactory returns a Factory. func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory { f := &factory{ diff --git a/connector/connector_test.go b/connector/connector_test.go index e190f5a4c31..9a31f829054 100644 --- a/connector/connector_test.go +++ b/connector/connector_test.go @@ -48,6 +48,8 @@ func TestNewFactoryNoOptions(t *testing.T) { assert.Equal(t, err, internal.ErrDataTypes(testID, pipeline.SignalLogs, pipeline.SignalMetrics)) _, err = factory.CreateLogsToLogs(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, internal.ErrDataTypes(testID, pipeline.SignalLogs, pipeline.SignalLogs)) + + assert.False(t, factory.Metadata().SingletonInstance) } func TestNewFactoryWithSameTypes(t *testing.T) { @@ -55,7 +57,8 @@ func TestNewFactoryWithSameTypes(t *testing.T) { factory := NewFactory(testType, func() component.Config { return &defaultCfg }, WithTracesToTraces(createTracesToTraces, component.StabilityLevelAlpha), WithMetricsToMetrics(createMetricsToMetrics, component.StabilityLevelBeta), - WithLogsToLogs(createLogsToLogs, component.StabilityLevelUnmaintained)) + WithLogsToLogs(createLogsToLogs, component.StabilityLevelUnmaintained), + AsSingletonInstance()) assert.EqualValues(t, testType, factory.Type()) assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) @@ -85,6 +88,8 @@ func TestNewFactoryWithSameTypes(t *testing.T) { assert.Equal(t, err, internal.ErrDataTypes(testID, pipeline.SignalLogs, pipeline.SignalTraces)) _, err = factory.CreateLogsToMetrics(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, internal.ErrDataTypes(testID, pipeline.SignalLogs, pipeline.SignalMetrics)) + + assert.True(t, factory.Metadata().SingletonInstance) } func TestNewFactoryWithTranslateTypes(t *testing.T) { diff --git a/connector/xconnector/connector.go b/connector/xconnector/connector.go index d697e06a8e4..412e361a3d0 100644 --- a/connector/xconnector/connector.go +++ b/connector/xconnector/connector.go @@ -269,6 +269,13 @@ func WithProfilesToLogs(createProfilesToLogs CreateProfilesToLogsFunc, sl compon }) } +// AsSingletonInstance sets the connector as a singleton instance. +func AsSingletonInstance() FactoryOption { + return factoryOptionFunc(func(o *factoryOpts) { + o.opts = append(o.opts, connector.AsSingletonInstance()) + }) +} + // factory implements the Factory interface. type factory struct { connector.Factory diff --git a/connector/xconnector/connector_test.go b/connector/xconnector/connector_test.go index 636e76450ed..6c096557c0d 100644 --- a/connector/xconnector/connector_test.go +++ b/connector/xconnector/connector_test.go @@ -44,12 +44,15 @@ func TestNewFactoryNoOptions(t *testing.T) { assert.Equal(t, err, internal.ErrDataTypes(testID, xpipeline.SignalProfiles, pipeline.SignalMetrics)) _, err = factory.CreateProfilesToLogs(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, internal.ErrDataTypes(testID, xpipeline.SignalProfiles, pipeline.SignalLogs)) + + assert.False(t, factory.Metadata().SingletonInstance) } func TestNewFactoryWithSameTypes(t *testing.T) { defaultCfg := struct{}{} factory := NewFactory(testType, func() component.Config { return &defaultCfg }, WithProfilesToProfiles(createProfilesToProfiles, component.StabilityLevelAlpha), + AsSingletonInstance(), ) assert.EqualValues(t, testType, factory.Type()) assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) @@ -64,6 +67,8 @@ func TestNewFactoryWithSameTypes(t *testing.T) { assert.Equal(t, err, internal.ErrDataTypes(testID, xpipeline.SignalProfiles, pipeline.SignalMetrics)) _, err = factory.CreateProfilesToLogs(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, internal.ErrDataTypes(testID, xpipeline.SignalProfiles, pipeline.SignalLogs)) + + assert.True(t, factory.Metadata().SingletonInstance) } func TestNewFactoryWithTranslateTypes(t *testing.T) { diff --git a/exporter/exporter.go b/exporter/exporter.go index 0e54bfb1b3c..38ed08aafa9 100644 --- a/exporter/exporter.go +++ b/exporter/exporter.go @@ -72,9 +72,17 @@ type Factory interface { // LogsStability gets the stability level of the Logs exporter. LogsStability() component.StabilityLevel + // Metadata returns the metadata describing the receiver. + Metadata() Metadata + unexportedFactoryFunc() } +// Metadata contains metadata describing the component that is created by the factory. +type Metadata struct { + SingletonInstance bool +} + // FactoryOption apply changes to Factory. type FactoryOption interface { // applyOption applies the option. @@ -132,6 +140,7 @@ type factory struct { metricsStabilityLevel component.StabilityLevel CreateLogsFunc logsStabilityLevel component.StabilityLevel + metadata Metadata } func (f *factory) Type() component.Type { @@ -152,6 +161,10 @@ func (f *factory) LogsStability() component.StabilityLevel { return f.logsStabilityLevel } +func (f *factory) Metadata() Metadata { + return f.metadata +} + // WithTraces overrides the default "error not supported" implementation for Factory.CreateTraces and the default "undefined" stability level. func WithTraces(createTraces CreateTracesFunc, sl component.StabilityLevel) FactoryOption { return factoryOptionFunc(func(o *factory) { @@ -176,6 +189,13 @@ func WithLogs(createLogs CreateLogsFunc, sl component.StabilityLevel) FactoryOpt }) } +// AsSingletonInstance indicates that the factory always returns the same instance of the component. +func AsSingletonInstance() FactoryOption { + return factoryOptionFunc(func(o *factory) { + o.metadata.SingletonInstance = true + }) +} + // NewFactory returns a Factory. func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory { f := &factory{ diff --git a/exporter/exporter_test.go b/exporter/exporter_test.go index d363e2af665..a9dc24117ad 100644 --- a/exporter/exporter_test.go +++ b/exporter/exporter_test.go @@ -29,6 +29,7 @@ func TestNewFactory(t *testing.T) { require.ErrorIs(t, err, pipeline.ErrSignalNotSupported) _, err = f.CreateLogs(context.Background(), Settings{}, &defaultCfg) require.ErrorIs(t, err, pipeline.ErrSignalNotSupported) + assert.False(t, f.Metadata().SingletonInstance) } func TestNewFactoryWithOptions(t *testing.T) { @@ -39,7 +40,8 @@ func TestNewFactoryWithOptions(t *testing.T) { func() component.Config { return &defaultCfg }, WithTraces(createTraces, component.StabilityLevelDevelopment), WithMetrics(createMetrics, component.StabilityLevelAlpha), - WithLogs(createLogs, component.StabilityLevelDeprecated)) + WithLogs(createLogs, component.StabilityLevelDeprecated), + AsSingletonInstance()) assert.EqualValues(t, testType, f.Type()) assert.EqualValues(t, &defaultCfg, f.CreateDefaultConfig()) @@ -54,6 +56,8 @@ func TestNewFactoryWithOptions(t *testing.T) { assert.Equal(t, component.StabilityLevelDeprecated, f.LogsStability()) _, err = f.CreateLogs(context.Background(), Settings{}, &defaultCfg) assert.NoError(t, err) + + assert.True(t, f.Metadata().SingletonInstance) } func TestMakeFactoryMap(t *testing.T) { diff --git a/exporter/xexporter/exporter.go b/exporter/xexporter/exporter.go index 7d83e92c8a6..a9ff55af3f6 100644 --- a/exporter/xexporter/exporter.go +++ b/exporter/xexporter/exporter.go @@ -88,6 +88,13 @@ func WithProfiles(createProfiles CreateProfilesFunc, sl component.StabilityLevel }) } +// AsSingletonInstance sets the exporter as a singleton instance. +func AsSingletonInstance() FactoryOption { + return factoryOptionFunc(func(o *factoryOpts) { + o.opts = append(o.opts, exporter.AsSingletonInstance()) + }) +} + type factory struct { exporter.Factory CreateProfilesFunc diff --git a/receiver/otlpreceiver/factory.go b/receiver/otlpreceiver/factory.go index cb7cc0a6c7d..e810a0e0038 100644 --- a/receiver/otlpreceiver/factory.go +++ b/receiver/otlpreceiver/factory.go @@ -34,6 +34,7 @@ func NewFactory() receiver.Factory { xreceiver.WithMetrics(createMetrics, metadata.MetricsStability), xreceiver.WithLogs(createLog, metadata.LogsStability), xreceiver.WithProfiles(createProfiles, metadata.ProfilesStability), + xreceiver.AsSingletonInstance(), ) } diff --git a/receiver/receiver.go b/receiver/receiver.go index dd7242d5f77..24c1182fc3a 100644 --- a/receiver/receiver.go +++ b/receiver/receiver.go @@ -84,9 +84,17 @@ type Factory interface { // LogsStability gets the stability level of the Logs receiver. LogsStability() component.StabilityLevel + // Metadata returns the metadata describing the receiver. + Metadata() Metadata + unexportedFactoryFunc() } +// Metadata contains metadata describing the component that is created by the factory. +type Metadata struct { + SingletonInstance bool +} + // FactoryOption apply changes to Factory. type FactoryOption interface { // applyOption applies the option. @@ -142,6 +150,7 @@ type factory struct { metricsStabilityLevel component.StabilityLevel CreateLogsFunc logsStabilityLevel component.StabilityLevel + metadata Metadata } func (f *factory) Type() component.Type { @@ -162,6 +171,10 @@ func (f *factory) LogsStability() component.StabilityLevel { return f.logsStabilityLevel } +func (f *factory) Metadata() Metadata { + return f.metadata +} + // WithTraces overrides the default "error not supported" implementation for Factory.CreateTraces and the default "undefined" stability level. func WithTraces(createTraces CreateTracesFunc, sl component.StabilityLevel) FactoryOption { return factoryOptionFunc(func(o *factory) { @@ -186,6 +199,13 @@ func WithLogs(createLogs CreateLogsFunc, sl component.StabilityLevel) FactoryOpt }) } +// AsSingletonInstance indicates that the factory always returns the same instance of the component. +func AsSingletonInstance() FactoryOption { + return factoryOptionFunc(func(o *factory) { + o.metadata.SingletonInstance = true + }) +} + // NewFactory returns a Factory. func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory { f := &factory{ diff --git a/receiver/receiver_test.go b/receiver/receiver_test.go index eb8af181019..3ce1635b59d 100644 --- a/receiver/receiver_test.go +++ b/receiver/receiver_test.go @@ -30,6 +30,7 @@ func TestNewFactory(t *testing.T) { require.ErrorIs(t, err, pipeline.ErrSignalNotSupported) _, err = f.CreateLogs(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) require.ErrorIs(t, err, pipeline.ErrSignalNotSupported) + assert.False(t, f.Metadata().SingletonInstance) } func TestNewFactoryWithOptions(t *testing.T) { @@ -40,7 +41,8 @@ func TestNewFactoryWithOptions(t *testing.T) { func() component.Config { return &defaultCfg }, WithTraces(createTraces, component.StabilityLevelDeprecated), WithMetrics(createMetrics, component.StabilityLevelAlpha), - WithLogs(createLogs, component.StabilityLevelStable)) + WithLogs(createLogs, component.StabilityLevelStable), + AsSingletonInstance()) assert.EqualValues(t, testType, f.Type()) assert.EqualValues(t, &defaultCfg, f.CreateDefaultConfig()) @@ -55,6 +57,8 @@ func TestNewFactoryWithOptions(t *testing.T) { assert.Equal(t, component.StabilityLevelStable, f.LogsStability()) _, err = f.CreateLogs(context.Background(), Settings{}, &defaultCfg, nil) assert.NoError(t, err) + + assert.True(t, f.Metadata().SingletonInstance) } func TestMakeFactoryMap(t *testing.T) { diff --git a/receiver/xreceiver/profiles.go b/receiver/xreceiver/profiles.go index 2e0cbbd8cee..19a17afadce 100644 --- a/receiver/xreceiver/profiles.go +++ b/receiver/xreceiver/profiles.go @@ -105,6 +105,13 @@ func WithProfiles(createProfiles CreateProfilesFunc, sl component.StabilityLevel }) } +// AsSingletonInstance sets the receiver as a singleton instance. +func AsSingletonInstance() FactoryOption { + return factoryOptionFunc(func(o *factoryOpts) { + o.opts = append(o.opts, receiver.AsSingletonInstance()) + }) +} + // NewFactory returns a Factory. func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory { opts := factoryOpts{factory: &factory{}} From 819f5a6d42c08349768df0d852652691d0cd2dcb Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Thu, 3 Oct 2024 16:24:58 -0400 Subject: [PATCH 2/3] [chore][graph] Remodel node id as attribute sets --- service/internal/graph/attribute/attribute.go | 100 +++++++++++++++++ .../graph/attribute/attribute_test.go | 104 ++++++++++++++++++ service/internal/graph/capabilities.go | 7 +- service/internal/graph/connector.go | 7 +- service/internal/graph/exporter.go | 7 +- service/internal/graph/fanout.go | 7 +- service/internal/graph/graph_test.go | 18 +-- service/internal/graph/node.go | 22 ---- service/internal/graph/processor.go | 7 +- service/internal/graph/receiver.go | 7 +- service/internal/graph/util_test.go | 5 +- 11 files changed, 235 insertions(+), 56 deletions(-) create mode 100644 service/internal/graph/attribute/attribute.go create mode 100644 service/internal/graph/attribute/attribute_test.go delete mode 100644 service/internal/graph/node.go diff --git a/service/internal/graph/attribute/attribute.go b/service/internal/graph/attribute/attribute.go new file mode 100644 index 00000000000..e81b1f9be23 --- /dev/null +++ b/service/internal/graph/attribute/attribute.go @@ -0,0 +1,100 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package attribute // import "go.opentelemetry.io/collector/service/internal/graph/attribute" + +import ( + "fmt" + "hash/fnv" + + "go.opentelemetry.io/otel/attribute" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pipeline" +) + +const ( + componentKindKey = "otelcol.component.kind" + componentIDKey = "otelcol.component.id" + pipelineIDKey = "otelcol.pipeline.id" + signalKey = "otelcol.signal" + signalOutputKey = "otelcol.signal.output" + + receiverKind = "receiver" + processorKind = "processor" + exporterKind = "exporter" + connectorKind = "connector" + capabiltiesKind = "capabilities" + fanoutKind = "fanout" +) + +type Attributes struct { + set attribute.Set + id int64 +} + +func newAttributes(attrs ...attribute.KeyValue) *Attributes { + h := fnv.New64a() + for _, kv := range attrs { + h.Write([]byte("(" + string(kv.Key) + "|" + kv.Value.AsString() + ")")) + } + return &Attributes{ + set: attribute.NewSet(attrs...), + id: int64(h.Sum64()), // #nosec G115 + } +} + +func (a Attributes) Attributes() *attribute.Set { + return &a.set +} + +func (a Attributes) ID() int64 { + return a.id +} + +func Receiver(pipelineType pipeline.Signal, id component.ID) *Attributes { + return newAttributes( + attribute.String(componentKindKey, receiverKind), + attribute.String(signalKey, pipelineType.String()), + attribute.String(componentIDKey, id.String()), + ) +} + +func Processor(pipelineID pipeline.ID, id component.ID) *Attributes { + return newAttributes( + attribute.String(componentKindKey, processorKind), + attribute.String(signalKey, pipelineID.Signal().String()), + attribute.String(pipelineIDKey, pipelineID.String()), + attribute.String(componentIDKey, id.String()), + ) +} + +func Exporter(pipelineType pipeline.Signal, id component.ID) *Attributes { + return newAttributes( + attribute.String(componentKindKey, exporterKind), + attribute.String(signalKey, pipelineType.String()), + attribute.String(componentIDKey, id.String()), + ) +} + +func Connector(exprPipelineType, rcvrPipelineType pipeline.Signal, id component.ID) *Attributes { + return newAttributes( + attribute.String(componentKindKey, connectorKind), + attribute.String(signalKey, fmt.Sprintf("%s_to_%s", exprPipelineType.String(), rcvrPipelineType.String())), + attribute.String(componentIDKey, id.String()), + ) +} + +func Capabilities(pipelineID pipeline.ID) *Attributes { + return newAttributes( + attribute.String(componentKindKey, capabiltiesKind), + attribute.String(pipelineIDKey, pipelineID.String()), + ) +} + +func Fanout(pipelineID pipeline.ID) *Attributes { + return newAttributes( + attribute.String(componentKindKey, fanoutKind), + attribute.String(pipelineIDKey, pipelineID.String()), + ) +} diff --git a/service/internal/graph/attribute/attribute_test.go b/service/internal/graph/attribute/attribute_test.go new file mode 100644 index 00000000000..db2b32dc197 --- /dev/null +++ b/service/internal/graph/attribute/attribute_test.go @@ -0,0 +1,104 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package attribute + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/pipeline/pipelineprofiles" +) + +var ( + signals = []pipeline.Signal{ + pipeline.SignalTraces, + pipeline.SignalMetrics, + pipeline.SignalLogs, + pipelineprofiles.SignalProfiles, + } + + cIDs = []component.ID{ + component.MustNewID("foo"), + component.MustNewID("foo2"), + component.MustNewID("bar"), + } + + pIDs = []pipeline.ID{ + pipeline.MustNewID("traces"), + pipeline.MustNewIDWithName("traces", "2"), + pipeline.MustNewID("metrics"), + pipeline.MustNewIDWithName("metrics", "2"), + pipeline.MustNewID("logs"), + pipeline.MustNewIDWithName("logs", "2"), + pipeline.MustNewID("profiles"), + pipeline.MustNewIDWithName("profiles", "2"), + } +) + +func TestAttributes(t *testing.T) { + // The sets are created independently but should be exactly equivalent. + // We will ensure that corresponding elements are equal and that + // non-corresponding elements are not equal. + setI, setJ := createExampleSets(), createExampleSets() + for i, ei := range setI { + for j, ej := range setJ { + if i == j { + require.Equal(t, ei.ID(), ej.ID()) + require.True(t, ei.Attributes().Equals(ej.Attributes())) + } else { + require.NotEqual(t, ei.ID(), ej.ID()) + require.False(t, ei.Attributes().Equals(ej.Attributes())) + } + } + } +} + +func createExampleSets() []*Attributes { + sets := []*Attributes{} + + // Receiver examples. + for _, sig := range signals { + for _, id := range cIDs { + sets = append(sets, Receiver(sig, id)) + } + } + + // Processor examples. + for _, pID := range pIDs { + for _, cID := range cIDs { + sets = append(sets, Processor(pID, cID)) + } + } + + // Exporter examples. + for _, sig := range signals { + for _, id := range cIDs { + sets = append(sets, Exporter(sig, id)) + } + } + + // Connector examples. + for _, exprSig := range signals { + for _, rcvrSig := range signals { + for _, id := range cIDs { + sets = append(sets, Connector(exprSig, rcvrSig, id)) + } + } + } + + // Capabilities examples. + for _, pID := range pIDs { + sets = append(sets, Capabilities(pID)) + } + + // Fanout examples. + for _, pID := range pIDs { + sets = append(sets, Fanout(pID)) + } + + return sets +} diff --git a/service/internal/graph/capabilities.go b/service/internal/graph/capabilities.go index 128e6fde926..ec8e2a576c0 100644 --- a/service/internal/graph/capabilities.go +++ b/service/internal/graph/capabilities.go @@ -7,10 +7,9 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/service/internal/graph/attribute" ) -const capabilitiesSeed = "capabilities" - var _ consumerNode = (*capabilitiesNode)(nil) // Every pipeline has a "virtual" capabilities node immediately after the receiver(s). @@ -19,7 +18,7 @@ var _ consumerNode = (*capabilitiesNode)(nil) // 2. Present a consistent "first consumer" for each pipeline. // The nodeID is derived from "pipeline ID". type capabilitiesNode struct { - nodeID + *attribute.Attributes pipelineID pipeline.ID baseConsumer consumer.ConsumeTracesFunc @@ -30,7 +29,7 @@ type capabilitiesNode struct { func newCapabilitiesNode(pipelineID pipeline.ID) *capabilitiesNode { return &capabilitiesNode{ - nodeID: newNodeID(capabilitiesSeed, pipelineID.String()), + Attributes: attribute.Capabilities(pipelineID), pipelineID: pipelineID, } } diff --git a/service/internal/graph/connector.go b/service/internal/graph/connector.go index 1f654454ee6..df780a20b4b 100644 --- a/service/internal/graph/connector.go +++ b/service/internal/graph/connector.go @@ -16,16 +16,15 @@ import ( "go.opentelemetry.io/collector/service/internal/builders" "go.opentelemetry.io/collector/service/internal/capabilityconsumer" "go.opentelemetry.io/collector/service/internal/components" + "go.opentelemetry.io/collector/service/internal/graph/attribute" ) -const connectorSeed = "connector" - var _ consumerNode = (*connectorNode)(nil) // A connector instance connects one pipeline type to one other pipeline type. // Therefore, nodeID is derived from "exporter pipeline type", "receiver pipeline type", and "component ID". type connectorNode struct { - nodeID + *attribute.Attributes componentID component.ID exprPipelineType pipeline.Signal rcvrPipelineType pipeline.Signal @@ -34,7 +33,7 @@ type connectorNode struct { func newConnectorNode(exprPipelineType, rcvrPipelineType pipeline.Signal, connID component.ID) *connectorNode { return &connectorNode{ - nodeID: newNodeID(connectorSeed, connID.String(), exprPipelineType.String(), rcvrPipelineType.String()), + Attributes: attribute.Connector(exprPipelineType, rcvrPipelineType, connID), componentID: connID, exprPipelineType: exprPipelineType, rcvrPipelineType: rcvrPipelineType, diff --git a/service/internal/graph/exporter.go b/service/internal/graph/exporter.go index ab7d0f6392b..5598121c6cc 100644 --- a/service/internal/graph/exporter.go +++ b/service/internal/graph/exporter.go @@ -13,16 +13,15 @@ import ( "go.opentelemetry.io/collector/pipeline/xpipeline" "go.opentelemetry.io/collector/service/internal/builders" "go.opentelemetry.io/collector/service/internal/components" + "go.opentelemetry.io/collector/service/internal/graph/attribute" ) -const exporterSeed = "exporter" - var _ consumerNode = (*exporterNode)(nil) // An exporter instance can be shared by multiple pipelines of the same type. // Therefore, nodeID is derived from "pipeline type" and "component ID". type exporterNode struct { - nodeID + *attribute.Attributes componentID component.ID pipelineType pipeline.Signal component.Component @@ -30,7 +29,7 @@ type exporterNode struct { func newExporterNode(pipelineType pipeline.Signal, exprID component.ID) *exporterNode { return &exporterNode{ - nodeID: newNodeID(exporterSeed, pipelineType.String(), exprID.String()), + Attributes: attribute.Exporter(pipelineType, exprID), componentID: exprID, pipelineType: pipelineType, } diff --git a/service/internal/graph/fanout.go b/service/internal/graph/fanout.go index 13c8d4ad1c5..7d1a76f086f 100644 --- a/service/internal/graph/fanout.go +++ b/service/internal/graph/fanout.go @@ -5,23 +5,22 @@ package graph // import "go.opentelemetry.io/collector/service/internal/graph" import ( "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/service/internal/graph/attribute" ) -const fanOutToExporters = "fanout_to_exporters" - var _ consumerNode = (*fanOutNode)(nil) // Each pipeline has one fan-out node before exporters. // Therefore, nodeID is derived from "pipeline ID". type fanOutNode struct { - nodeID + *attribute.Attributes pipelineID pipeline.ID baseConsumer } func newFanOutNode(pipelineID pipeline.ID) *fanOutNode { return &fanOutNode{ - nodeID: newNodeID(fanOutToExporters, pipelineID.String()), + Attributes: attribute.Fanout(pipelineID), pipelineID: pipelineID, } } diff --git a/service/internal/graph/graph_test.go b/service/internal/graph/graph_test.go index be4a98e4135..b5d245154dd 100644 --- a/service/internal/graph/graph_test.go +++ b/service/internal/graph/graph_test.go @@ -2231,11 +2231,11 @@ func TestGraphBuildErrors(t *testing.T) { }, }, expected: `cycle detected: ` + - `connector "nop/conn1" (traces to traces) -> ` + - `processor "nop" in pipeline "traces/2" -> ` + `connector "nop/conn" (traces to traces) -> ` + `processor "nop" in pipeline "traces/1" -> ` + - `connector "nop/conn1" (traces to traces)`, + `connector "nop/conn1" (traces to traces) -> ` + + `processor "nop" in pipeline "traces/2" -> ` + + `connector "nop/conn" (traces to traces)`, }, { name: "not_allowed_deep_cycle_metrics.yaml", @@ -2321,11 +2321,11 @@ func TestGraphBuildErrors(t *testing.T) { }, }, expected: `cycle detected: ` + - `connector "nop/conn1" (logs to logs) -> ` + - `processor "nop" in pipeline "logs/2" -> ` + `connector "nop/conn" (logs to logs) -> ` + `processor "nop" in pipeline "logs/1" -> ` + - `connector "nop/conn1" (logs to logs)`, + `connector "nop/conn1" (logs to logs) -> ` + + `processor "nop" in pipeline "logs/2" -> ` + + `connector "nop/conn" (logs to logs)`, }, { name: "not_allowed_deep_cycle_profiles.yaml", @@ -2427,13 +2427,13 @@ func TestGraphBuildErrors(t *testing.T) { }, }, expected: `cycle detected: ` + + `connector "nop/forkagain" (traces to traces) -> ` + + `processor "nop" in pipeline "traces/copy2b" -> ` + `connector "nop/rawlog" (traces to logs) -> ` + `processor "nop" in pipeline "logs/raw" -> ` + `connector "nop/fork" (logs to traces) -> ` + `processor "nop" in pipeline "traces/copy2" -> ` + - `connector "nop/forkagain" (traces to traces) -> ` + - `processor "nop" in pipeline "traces/copy2b" -> ` + - `connector "nop/rawlog" (traces to logs)`, + `connector "nop/forkagain" (traces to traces)`, }, { name: "unknown_exporter_config", diff --git a/service/internal/graph/node.go b/service/internal/graph/node.go deleted file mode 100644 index 81946a79df4..00000000000 --- a/service/internal/graph/node.go +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package graph // import "go.opentelemetry.io/collector/service/internal/graph" - -import ( - "hash/fnv" - "strings" -) - -type nodeID int64 - -func (n nodeID) ID() int64 { - return int64(n) -} - -func newNodeID(parts ...string) nodeID { - h := fnv.New64a() - h.Write([]byte(strings.Join(parts, "|"))) - //nolint:gosec - return nodeID(h.Sum64()) -} diff --git a/service/internal/graph/processor.go b/service/internal/graph/processor.go index 3288a505d80..cfe36cb461c 100644 --- a/service/internal/graph/processor.go +++ b/service/internal/graph/processor.go @@ -15,16 +15,15 @@ import ( "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/service/internal/builders" "go.opentelemetry.io/collector/service/internal/components" + "go.opentelemetry.io/collector/service/internal/graph/attribute" ) -const processorSeed = "processor" - var _ consumerNode = (*processorNode)(nil) // Every processor instance is unique to one pipeline. // Therefore, nodeID is derived from "pipeline ID" and "component ID". type processorNode struct { - nodeID + *attribute.Attributes componentID component.ID pipelineID pipeline.ID component.Component @@ -32,7 +31,7 @@ type processorNode struct { func newProcessorNode(pipelineID pipeline.ID, procID component.ID) *processorNode { return &processorNode{ - nodeID: newNodeID(processorSeed, pipelineID.String(), procID.String()), + Attributes: attribute.Processor(pipelineID, procID), componentID: procID, pipelineID: pipelineID, } diff --git a/service/internal/graph/receiver.go b/service/internal/graph/receiver.go index 48f7a36d148..57dfcded117 100644 --- a/service/internal/graph/receiver.go +++ b/service/internal/graph/receiver.go @@ -16,14 +16,13 @@ import ( "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/service/internal/builders" "go.opentelemetry.io/collector/service/internal/components" + "go.opentelemetry.io/collector/service/internal/graph/attribute" ) -const receiverSeed = "receiver" - // A receiver instance can be shared by multiple pipelines of the same type. // Therefore, nodeID is derived from "pipeline type" and "component ID". type receiverNode struct { - nodeID + *attribute.Attributes componentID component.ID pipelineType pipeline.Signal component.Component @@ -31,7 +30,7 @@ type receiverNode struct { func newReceiverNode(pipelineType pipeline.Signal, recvID component.ID) *receiverNode { return &receiverNode{ - nodeID: newNodeID(receiverSeed, pipelineType.String(), recvID.String()), + Attributes: attribute.Receiver(pipelineType, recvID), componentID: recvID, pipelineType: pipelineType, } diff --git a/service/internal/graph/util_test.go b/service/internal/graph/util_test.go index 60be0a3d57a..5e27bf6f340 100644 --- a/service/internal/graph/util_test.go +++ b/service/internal/graph/util_test.go @@ -6,6 +6,7 @@ package graph import ( "context" "errors" + "hash/fnv" "sync" "go.opentelemetry.io/collector/component" @@ -37,7 +38,9 @@ type testNode struct { // ID satisfies the graph.Node interface, allowing // testNode to be used in a simple.DirectedGraph func (n *testNode) ID() int64 { - return int64(newNodeID(n.id.String())) + h := fnv.New64a() + h.Write([]byte(n.id.String())) + return int64(h.Sum64()) // #nosec G115 } func (n *testNode) Start(ctx context.Context, _ component.Host) error { From c72b516bb3fff1030699edcee932422f95e080a1 Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Wed, 8 Jan 2025 13:36:35 -0600 Subject: [PATCH 3/3] Use attributes in loggers --- ...ngleton-flags-and-attributes-exporter.yaml | 25 ++ ...ngleton-flags-and-attributes-receiver.yaml | 25 ++ connector/connector.go | 21 -- connector/connector_test.go | 7 +- connector/xconnector/connector.go | 7 - connector/xconnector/connector_test.go | 5 - exporter/exporter_test.go | 2 +- exporter/xexporter/exporter_test.go | 1 + receiver/receiver_test.go | 2 +- receiver/xreceiver/receiver_test.go | 1 + service/extensions/extensions.go | 6 +- service/internal/attribute/attribute.go | 23 ++ service/internal/attribute/attribute_test.go | 32 +++ service/internal/builders/connector.go | 2 +- service/internal/builders/exporter.go | 2 +- service/internal/builders/processor.go | 2 +- service/internal/builders/receiver.go | 2 +- service/internal/components/loggers.go | 58 ----- service/internal/components/package_test.go | 14 -- service/internal/graph/attribute/attribute.go | 100 -------- .../graph/attribute/attribute_test.go | 104 -------- service/internal/graph/capabilities.go | 2 +- service/internal/graph/connector.go | 5 +- service/internal/graph/exporter.go | 57 +++-- service/internal/graph/fanout.go | 2 +- service/internal/graph/graph.go | 64 ++++- service/internal/graph/graph_test.go | 232 +++++++++++++++++- service/internal/graph/processor.go | 5 +- service/internal/graph/receiver.go | 85 ++++--- service/internal/graph/util_test.go | 4 +- .../testcomponents/example_receiver.go | 45 +--- .../testcomponents/singleton_exporter.go | 124 ++++++++++ .../testcomponents/singleton_exporter_test.go | 46 ++++ .../testcomponents/singleton_receiver.go | 105 ++++++++ .../testcomponents/singleton_receiver_test.go | 26 ++ 35 files changed, 793 insertions(+), 450 deletions(-) create mode 100644 .chloggen/singleton-flags-and-attributes-exporter.yaml create mode 100644 .chloggen/singleton-flags-and-attributes-receiver.yaml delete mode 100644 service/internal/components/loggers.go delete mode 100644 service/internal/components/package_test.go delete mode 100644 service/internal/graph/attribute/attribute.go delete mode 100644 service/internal/graph/attribute/attribute_test.go create mode 100644 service/internal/testcomponents/singleton_exporter.go create mode 100644 service/internal/testcomponents/singleton_exporter_test.go create mode 100644 service/internal/testcomponents/singleton_receiver.go create mode 100644 service/internal/testcomponents/singleton_receiver_test.go diff --git a/.chloggen/singleton-flags-and-attributes-exporter.yaml b/.chloggen/singleton-flags-and-attributes-exporter.yaml new file mode 100644 index 00000000000..80a0fe4f74f --- /dev/null +++ b/.chloggen/singleton-flags-and-attributes-exporter.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add `Metadata` method to `Factory` interface. + +# One or more tracking issues or pull requests related to the change +issues: [12057] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/.chloggen/singleton-flags-and-attributes-receiver.yaml b/.chloggen/singleton-flags-and-attributes-receiver.yaml new file mode 100644 index 00000000000..0a1e81bc61d --- /dev/null +++ b/.chloggen/singleton-flags-and-attributes-receiver.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: receiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add `Metadata` method to `Factory` interface. + +# One or more tracking issues or pull requests related to the change +issues: [12057] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/connector/connector.go b/connector/connector.go index 64a038d2a95..2ae78f26a6b 100644 --- a/connector/connector.go +++ b/connector/connector.go @@ -112,17 +112,9 @@ type Factory interface { LogsToMetricsStability() component.StabilityLevel LogsToLogsStability() component.StabilityLevel - // Metadata returns the metadata describing the receiver. - Metadata() Metadata - unexportedFactoryFunc() } -// Metadata contains metadata describing the component that is created by the factory. -type Metadata struct { - SingletonInstance bool -} - // FactoryOption applies changes to Factory. type FactoryOption interface { // apply applies the option. @@ -309,13 +301,6 @@ func WithLogsToLogs(createLogsToLogs CreateLogsToLogsFunc, sl component.Stabilit }) } -// AsSingletonInstance indicates that the factory always returns the same instance of the component. -func AsSingletonInstance() FactoryOption { - return factoryOptionFunc(func(o *factory) { - o.metadata.SingletonInstance = true - }) -} - // factory implements the Factory interface. type factory struct { cfgType component.Type @@ -344,8 +329,6 @@ type factory struct { logsToTracesStabilityLevel component.StabilityLevel logsToMetricsStabilityLevel component.StabilityLevel logsToLogsStabilityLevel component.StabilityLevel - - metadata Metadata } // Type returns the type of component. @@ -391,10 +374,6 @@ func (f *factory) LogsToLogsStability() component.StabilityLevel { return f.logsToLogsStabilityLevel } -func (f *factory) Metadata() Metadata { - return f.metadata -} - // NewFactory returns a Factory. func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory { f := &factory{ diff --git a/connector/connector_test.go b/connector/connector_test.go index 9a31f829054..e190f5a4c31 100644 --- a/connector/connector_test.go +++ b/connector/connector_test.go @@ -48,8 +48,6 @@ func TestNewFactoryNoOptions(t *testing.T) { assert.Equal(t, err, internal.ErrDataTypes(testID, pipeline.SignalLogs, pipeline.SignalMetrics)) _, err = factory.CreateLogsToLogs(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, internal.ErrDataTypes(testID, pipeline.SignalLogs, pipeline.SignalLogs)) - - assert.False(t, factory.Metadata().SingletonInstance) } func TestNewFactoryWithSameTypes(t *testing.T) { @@ -57,8 +55,7 @@ func TestNewFactoryWithSameTypes(t *testing.T) { factory := NewFactory(testType, func() component.Config { return &defaultCfg }, WithTracesToTraces(createTracesToTraces, component.StabilityLevelAlpha), WithMetricsToMetrics(createMetricsToMetrics, component.StabilityLevelBeta), - WithLogsToLogs(createLogsToLogs, component.StabilityLevelUnmaintained), - AsSingletonInstance()) + WithLogsToLogs(createLogsToLogs, component.StabilityLevelUnmaintained)) assert.EqualValues(t, testType, factory.Type()) assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) @@ -88,8 +85,6 @@ func TestNewFactoryWithSameTypes(t *testing.T) { assert.Equal(t, err, internal.ErrDataTypes(testID, pipeline.SignalLogs, pipeline.SignalTraces)) _, err = factory.CreateLogsToMetrics(context.Background(), Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, internal.ErrDataTypes(testID, pipeline.SignalLogs, pipeline.SignalMetrics)) - - assert.True(t, factory.Metadata().SingletonInstance) } func TestNewFactoryWithTranslateTypes(t *testing.T) { diff --git a/connector/xconnector/connector.go b/connector/xconnector/connector.go index 412e361a3d0..d697e06a8e4 100644 --- a/connector/xconnector/connector.go +++ b/connector/xconnector/connector.go @@ -269,13 +269,6 @@ func WithProfilesToLogs(createProfilesToLogs CreateProfilesToLogsFunc, sl compon }) } -// AsSingletonInstance sets the connector as a singleton instance. -func AsSingletonInstance() FactoryOption { - return factoryOptionFunc(func(o *factoryOpts) { - o.opts = append(o.opts, connector.AsSingletonInstance()) - }) -} - // factory implements the Factory interface. type factory struct { connector.Factory diff --git a/connector/xconnector/connector_test.go b/connector/xconnector/connector_test.go index 6c096557c0d..636e76450ed 100644 --- a/connector/xconnector/connector_test.go +++ b/connector/xconnector/connector_test.go @@ -44,15 +44,12 @@ func TestNewFactoryNoOptions(t *testing.T) { assert.Equal(t, err, internal.ErrDataTypes(testID, xpipeline.SignalProfiles, pipeline.SignalMetrics)) _, err = factory.CreateProfilesToLogs(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, internal.ErrDataTypes(testID, xpipeline.SignalProfiles, pipeline.SignalLogs)) - - assert.False(t, factory.Metadata().SingletonInstance) } func TestNewFactoryWithSameTypes(t *testing.T) { defaultCfg := struct{}{} factory := NewFactory(testType, func() component.Config { return &defaultCfg }, WithProfilesToProfiles(createProfilesToProfiles, component.StabilityLevelAlpha), - AsSingletonInstance(), ) assert.EqualValues(t, testType, factory.Type()) assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) @@ -67,8 +64,6 @@ func TestNewFactoryWithSameTypes(t *testing.T) { assert.Equal(t, err, internal.ErrDataTypes(testID, xpipeline.SignalProfiles, pipeline.SignalMetrics)) _, err = factory.CreateProfilesToLogs(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, internal.ErrDataTypes(testID, xpipeline.SignalProfiles, pipeline.SignalLogs)) - - assert.True(t, factory.Metadata().SingletonInstance) } func TestNewFactoryWithTranslateTypes(t *testing.T) { diff --git a/exporter/exporter_test.go b/exporter/exporter_test.go index a9dc24117ad..fcc5f5a0b68 100644 --- a/exporter/exporter_test.go +++ b/exporter/exporter_test.go @@ -55,7 +55,7 @@ func TestNewFactoryWithOptions(t *testing.T) { assert.Equal(t, component.StabilityLevelDeprecated, f.LogsStability()) _, err = f.CreateLogs(context.Background(), Settings{}, &defaultCfg) - assert.NoError(t, err) + require.NoError(t, err) assert.True(t, f.Metadata().SingletonInstance) } diff --git a/exporter/xexporter/exporter_test.go b/exporter/xexporter/exporter_test.go index d1929c4c22e..f7287d2807b 100644 --- a/exporter/xexporter/exporter_test.go +++ b/exporter/xexporter/exporter_test.go @@ -21,6 +21,7 @@ func TestNewFactoryWithProfiles(t *testing.T) { testType, func() component.Config { return &defaultCfg }, WithProfiles(createProfiles, component.StabilityLevelDevelopment), + AsSingletonInstance(), ) assert.EqualValues(t, testType, factory.Type()) assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) diff --git a/receiver/receiver_test.go b/receiver/receiver_test.go index 3ce1635b59d..d685909ea51 100644 --- a/receiver/receiver_test.go +++ b/receiver/receiver_test.go @@ -56,7 +56,7 @@ func TestNewFactoryWithOptions(t *testing.T) { assert.Equal(t, component.StabilityLevelStable, f.LogsStability()) _, err = f.CreateLogs(context.Background(), Settings{}, &defaultCfg, nil) - assert.NoError(t, err) + require.NoError(t, err) assert.True(t, f.Metadata().SingletonInstance) } diff --git a/receiver/xreceiver/receiver_test.go b/receiver/xreceiver/receiver_test.go index 06ff1f592f9..8229174e416 100644 --- a/receiver/xreceiver/receiver_test.go +++ b/receiver/xreceiver/receiver_test.go @@ -22,6 +22,7 @@ func TestNewFactoryWithProfiles(t *testing.T) { testType, func() component.Config { return &defaultCfg }, WithProfiles(createProfiles, component.StabilityLevelAlpha), + AsSingletonInstance(), ) assert.EqualValues(t, testType, factory.Type()) assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) diff --git a/service/extensions/extensions.go b/service/extensions/extensions.go index fcefcb33f20..dd5204ac182 100644 --- a/service/extensions/extensions.go +++ b/service/extensions/extensions.go @@ -17,8 +17,8 @@ import ( "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/extension/extensioncapabilities" + "go.opentelemetry.io/collector/service/internal/attribute" "go.opentelemetry.io/collector/service/internal/builders" - "go.opentelemetry.io/collector/service/internal/components" "go.opentelemetry.io/collector/service/internal/status" "go.opentelemetry.io/collector/service/internal/zpages" ) @@ -38,7 +38,7 @@ type Extensions struct { func (bes *Extensions) Start(ctx context.Context, host component.Host) error { bes.telemetry.Logger.Info("Starting extensions...") for _, extID := range bes.extensionIDs { - extLogger := components.ExtensionLogger(bes.telemetry.Logger, extID) + extLogger := attribute.Extension(extID).Logger(bes.telemetry.Logger) extLogger.Info("Extension is starting...") instanceID := bes.instanceIDs[extID] ext := bes.extMap[extID] @@ -216,7 +216,7 @@ func New(ctx context.Context, set Settings, cfg Config, options ...Option) (*Ext BuildInfo: set.BuildInfo, ModuleInfo: set.ModuleInfo, } - extSet.TelemetrySettings.Logger = components.ExtensionLogger(set.Telemetry.Logger, extID) + extSet.TelemetrySettings.Logger = attribute.Extension(extID).Logger(set.Telemetry.Logger) ext, err := set.Extensions.Create(ctx, extSet) if err != nil { diff --git a/service/internal/attribute/attribute.go b/service/internal/attribute/attribute.go index 7bfdf217961..bd38093e4cd 100644 --- a/service/internal/attribute/attribute.go +++ b/service/internal/attribute/attribute.go @@ -7,6 +7,7 @@ import ( "hash/fnv" "go.opentelemetry.io/otel/attribute" + "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pipeline" @@ -47,6 +48,14 @@ func (a Attributes) ID() int64 { return a.id } +func (a Attributes) Logger(logger *zap.Logger) *zap.Logger { + fields := make([]zap.Field, 0, a.set.Len()) + for _, kv := range a.set.ToSlice() { + fields = append(fields, zap.String(string(kv.Key), kv.Value.AsString())) + } + return logger.With(fields...) +} + func Receiver(pipelineType pipeline.Signal, id component.ID) *Attributes { return newAttributes( attribute.String(componentKindKey, component.KindReceiver.String()), @@ -55,6 +64,13 @@ func Receiver(pipelineType pipeline.Signal, id component.ID) *Attributes { ) } +func ReceiverSingleton(id component.ID) *Attributes { + return newAttributes( + attribute.String(componentKindKey, component.KindReceiver.String()), + attribute.String(componentIDKey, id.String()), + ) +} + func Processor(pipelineID pipeline.ID, id component.ID) *Attributes { return newAttributes( attribute.String(componentKindKey, component.KindProcessor.String()), @@ -72,6 +88,13 @@ func Exporter(pipelineType pipeline.Signal, id component.ID) *Attributes { ) } +func ExporterSingleton(id component.ID) *Attributes { + return newAttributes( + attribute.String(componentKindKey, component.KindExporter.String()), + attribute.String(componentIDKey, id.String()), + ) +} + func Connector(exprPipelineType, rcvrPipelineType pipeline.Signal, id component.ID) *Attributes { return newAttributes( attribute.String(componentKindKey, component.KindConnector.String()), diff --git a/service/internal/attribute/attribute_test.go b/service/internal/attribute/attribute_test.go index 6025f77a20b..eb554953d67 100644 --- a/service/internal/attribute/attribute_test.go +++ b/service/internal/attribute/attribute_test.go @@ -58,6 +58,19 @@ func TestReceiver(t *testing.T) { } } +func TestReceiverSingleton(t *testing.T) { + for _, id := range cIDs { + r := ReceiverSingleton(id) + componentKind, ok := r.Attributes().Value(componentKindKey) + require.True(t, ok) + require.Equal(t, component.KindReceiver.String(), componentKind.AsString()) + + componentID, ok := r.Attributes().Value(componentIDKey) + require.True(t, ok) + require.Equal(t, id.String(), componentID.AsString()) + } +} + func TestProcessor(t *testing.T) { for _, pID := range pIDs { for _, id := range cIDs { @@ -96,6 +109,19 @@ func TestExporter(t *testing.T) { } } +func TestExporterSingleton(t *testing.T) { + for _, id := range cIDs { + e := ExporterSingleton(id) + componentKind, ok := e.Attributes().Value(componentKindKey) + require.True(t, ok) + require.Equal(t, component.KindExporter.String(), componentKind.AsString()) + + componentID, ok := e.Attributes().Value(componentIDKey) + require.True(t, ok) + require.Equal(t, id.String(), componentID.AsString()) + } +} + func TestConnector(t *testing.T) { for _, exprSig := range signals { for _, rcvrSig := range signals { @@ -155,6 +181,9 @@ func createExampleSets() []*Attributes { sets = append(sets, Receiver(sig, id)) } } + for _, id := range cIDs { + sets = append(sets, ReceiverSingleton(id)) + } // Processor examples. for _, pID := range pIDs { @@ -169,6 +198,9 @@ func createExampleSets() []*Attributes { sets = append(sets, Exporter(sig, id)) } } + for _, id := range cIDs { + sets = append(sets, ExporterSingleton(id)) + } // Connector examples. for _, exprSig := range signals { diff --git a/service/internal/builders/connector.go b/service/internal/builders/connector.go index b157d2dd64a..3bc8f36f762 100644 --- a/service/internal/builders/connector.go +++ b/service/internal/builders/connector.go @@ -376,7 +376,7 @@ func (b *ConnectorBuilder) IsConfigured(componentID component.ID) bool { return ok } -func (b *ConnectorBuilder) Factory(componentType component.Type) component.Factory { +func (b *ConnectorBuilder) Factory(componentType component.Type) connector.Factory { return b.factories[componentType] } diff --git a/service/internal/builders/exporter.go b/service/internal/builders/exporter.go index 6570828b76f..5b881cc9a58 100644 --- a/service/internal/builders/exporter.go +++ b/service/internal/builders/exporter.go @@ -94,7 +94,7 @@ func (b *ExporterBuilder) CreateProfiles(ctx context.Context, set exporter.Setti return f.CreateProfiles(ctx, set, cfg) } -func (b *ExporterBuilder) Factory(componentType component.Type) component.Factory { +func (b *ExporterBuilder) Factory(componentType component.Type) exporter.Factory { return b.factories[componentType] } diff --git a/service/internal/builders/processor.go b/service/internal/builders/processor.go index c0df0f3b575..2394a815c0e 100644 --- a/service/internal/builders/processor.go +++ b/service/internal/builders/processor.go @@ -108,7 +108,7 @@ func (b *ProcessorBuilder) CreateProfiles(ctx context.Context, set processor.Set return f.CreateProfiles(ctx, set, cfg, next) } -func (b *ProcessorBuilder) Factory(componentType component.Type) component.Factory { +func (b *ProcessorBuilder) Factory(componentType component.Type) processor.Factory { return b.factories[componentType] } diff --git a/service/internal/builders/receiver.go b/service/internal/builders/receiver.go index 007d9be2187..bf715fd3250 100644 --- a/service/internal/builders/receiver.go +++ b/service/internal/builders/receiver.go @@ -110,7 +110,7 @@ func (b *ReceiverBuilder) CreateProfiles(ctx context.Context, set receiver.Setti return f.CreateProfiles(ctx, set, cfg, next) } -func (b *ReceiverBuilder) Factory(componentType component.Type) component.Factory { +func (b *ReceiverBuilder) Factory(componentType component.Type) receiver.Factory { return b.factories[componentType] } diff --git a/service/internal/components/loggers.go b/service/internal/components/loggers.go deleted file mode 100644 index f02d19fb082..00000000000 --- a/service/internal/components/loggers.go +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package components // import "go.opentelemetry.io/collector/service/internal/components" - -import ( - "strings" - - "go.uber.org/zap" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/pipeline" -) - -const ( - zapKindKey = "kind" - zapNameKey = "name" - zapDataTypeKey = "data_type" - zapStabilityKey = "stability" - zapPipelineKey = "pipeline" - zapExporterInPipeline = "exporter_in_pipeline" - zapReceiverInPipeline = "receiver_in_pipeline" -) - -func ReceiverLogger(logger *zap.Logger, id component.ID, dt pipeline.Signal) *zap.Logger { - return logger.With( - zap.String(zapKindKey, strings.ToLower(component.KindReceiver.String())), - zap.String(zapNameKey, id.String()), - zap.String(zapDataTypeKey, dt.String())) -} - -func ProcessorLogger(logger *zap.Logger, id component.ID, pipelineID pipeline.ID) *zap.Logger { - return logger.With( - zap.String(zapKindKey, strings.ToLower(component.KindProcessor.String())), - zap.String(zapNameKey, id.String()), - zap.String(zapPipelineKey, pipelineID.String())) -} - -func ExporterLogger(logger *zap.Logger, id component.ID, dt pipeline.Signal) *zap.Logger { - return logger.With( - zap.String(zapKindKey, strings.ToLower(component.KindExporter.String())), - zap.String(zapDataTypeKey, dt.String()), - zap.String(zapNameKey, id.String())) -} - -func ExtensionLogger(logger *zap.Logger, id component.ID) *zap.Logger { - return logger.With( - zap.String(zapKindKey, strings.ToLower(component.KindExtension.String())), - zap.String(zapNameKey, id.String())) -} - -func ConnectorLogger(logger *zap.Logger, id component.ID, expDT, rcvDT pipeline.Signal) *zap.Logger { - return logger.With( - zap.String(zapKindKey, strings.ToLower(component.KindConnector.String())), - zap.String(zapNameKey, id.String()), - zap.String(zapExporterInPipeline, expDT.String()), - zap.String(zapReceiverInPipeline, rcvDT.String())) -} diff --git a/service/internal/components/package_test.go b/service/internal/components/package_test.go deleted file mode 100644 index 30b5a82311a..00000000000 --- a/service/internal/components/package_test.go +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package components - -import ( - "testing" - - "go.uber.org/goleak" -) - -func TestMain(m *testing.M) { - goleak.VerifyTestMain(m) -} diff --git a/service/internal/graph/attribute/attribute.go b/service/internal/graph/attribute/attribute.go deleted file mode 100644 index e81b1f9be23..00000000000 --- a/service/internal/graph/attribute/attribute.go +++ /dev/null @@ -1,100 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package attribute // import "go.opentelemetry.io/collector/service/internal/graph/attribute" - -import ( - "fmt" - "hash/fnv" - - "go.opentelemetry.io/otel/attribute" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/pipeline" -) - -const ( - componentKindKey = "otelcol.component.kind" - componentIDKey = "otelcol.component.id" - pipelineIDKey = "otelcol.pipeline.id" - signalKey = "otelcol.signal" - signalOutputKey = "otelcol.signal.output" - - receiverKind = "receiver" - processorKind = "processor" - exporterKind = "exporter" - connectorKind = "connector" - capabiltiesKind = "capabilities" - fanoutKind = "fanout" -) - -type Attributes struct { - set attribute.Set - id int64 -} - -func newAttributes(attrs ...attribute.KeyValue) *Attributes { - h := fnv.New64a() - for _, kv := range attrs { - h.Write([]byte("(" + string(kv.Key) + "|" + kv.Value.AsString() + ")")) - } - return &Attributes{ - set: attribute.NewSet(attrs...), - id: int64(h.Sum64()), // #nosec G115 - } -} - -func (a Attributes) Attributes() *attribute.Set { - return &a.set -} - -func (a Attributes) ID() int64 { - return a.id -} - -func Receiver(pipelineType pipeline.Signal, id component.ID) *Attributes { - return newAttributes( - attribute.String(componentKindKey, receiverKind), - attribute.String(signalKey, pipelineType.String()), - attribute.String(componentIDKey, id.String()), - ) -} - -func Processor(pipelineID pipeline.ID, id component.ID) *Attributes { - return newAttributes( - attribute.String(componentKindKey, processorKind), - attribute.String(signalKey, pipelineID.Signal().String()), - attribute.String(pipelineIDKey, pipelineID.String()), - attribute.String(componentIDKey, id.String()), - ) -} - -func Exporter(pipelineType pipeline.Signal, id component.ID) *Attributes { - return newAttributes( - attribute.String(componentKindKey, exporterKind), - attribute.String(signalKey, pipelineType.String()), - attribute.String(componentIDKey, id.String()), - ) -} - -func Connector(exprPipelineType, rcvrPipelineType pipeline.Signal, id component.ID) *Attributes { - return newAttributes( - attribute.String(componentKindKey, connectorKind), - attribute.String(signalKey, fmt.Sprintf("%s_to_%s", exprPipelineType.String(), rcvrPipelineType.String())), - attribute.String(componentIDKey, id.String()), - ) -} - -func Capabilities(pipelineID pipeline.ID) *Attributes { - return newAttributes( - attribute.String(componentKindKey, capabiltiesKind), - attribute.String(pipelineIDKey, pipelineID.String()), - ) -} - -func Fanout(pipelineID pipeline.ID) *Attributes { - return newAttributes( - attribute.String(componentKindKey, fanoutKind), - attribute.String(pipelineIDKey, pipelineID.String()), - ) -} diff --git a/service/internal/graph/attribute/attribute_test.go b/service/internal/graph/attribute/attribute_test.go deleted file mode 100644 index db2b32dc197..00000000000 --- a/service/internal/graph/attribute/attribute_test.go +++ /dev/null @@ -1,104 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package attribute - -import ( - "testing" - - "github.com/stretchr/testify/require" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/pipeline" - "go.opentelemetry.io/collector/pipeline/pipelineprofiles" -) - -var ( - signals = []pipeline.Signal{ - pipeline.SignalTraces, - pipeline.SignalMetrics, - pipeline.SignalLogs, - pipelineprofiles.SignalProfiles, - } - - cIDs = []component.ID{ - component.MustNewID("foo"), - component.MustNewID("foo2"), - component.MustNewID("bar"), - } - - pIDs = []pipeline.ID{ - pipeline.MustNewID("traces"), - pipeline.MustNewIDWithName("traces", "2"), - pipeline.MustNewID("metrics"), - pipeline.MustNewIDWithName("metrics", "2"), - pipeline.MustNewID("logs"), - pipeline.MustNewIDWithName("logs", "2"), - pipeline.MustNewID("profiles"), - pipeline.MustNewIDWithName("profiles", "2"), - } -) - -func TestAttributes(t *testing.T) { - // The sets are created independently but should be exactly equivalent. - // We will ensure that corresponding elements are equal and that - // non-corresponding elements are not equal. - setI, setJ := createExampleSets(), createExampleSets() - for i, ei := range setI { - for j, ej := range setJ { - if i == j { - require.Equal(t, ei.ID(), ej.ID()) - require.True(t, ei.Attributes().Equals(ej.Attributes())) - } else { - require.NotEqual(t, ei.ID(), ej.ID()) - require.False(t, ei.Attributes().Equals(ej.Attributes())) - } - } - } -} - -func createExampleSets() []*Attributes { - sets := []*Attributes{} - - // Receiver examples. - for _, sig := range signals { - for _, id := range cIDs { - sets = append(sets, Receiver(sig, id)) - } - } - - // Processor examples. - for _, pID := range pIDs { - for _, cID := range cIDs { - sets = append(sets, Processor(pID, cID)) - } - } - - // Exporter examples. - for _, sig := range signals { - for _, id := range cIDs { - sets = append(sets, Exporter(sig, id)) - } - } - - // Connector examples. - for _, exprSig := range signals { - for _, rcvrSig := range signals { - for _, id := range cIDs { - sets = append(sets, Connector(exprSig, rcvrSig, id)) - } - } - } - - // Capabilities examples. - for _, pID := range pIDs { - sets = append(sets, Capabilities(pID)) - } - - // Fanout examples. - for _, pID := range pIDs { - sets = append(sets, Fanout(pID)) - } - - return sets -} diff --git a/service/internal/graph/capabilities.go b/service/internal/graph/capabilities.go index ec8e2a576c0..fa9ed5ea142 100644 --- a/service/internal/graph/capabilities.go +++ b/service/internal/graph/capabilities.go @@ -7,7 +7,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/pipeline" - "go.opentelemetry.io/collector/service/internal/graph/attribute" + "go.opentelemetry.io/collector/service/internal/attribute" ) var _ consumerNode = (*capabilitiesNode)(nil) diff --git a/service/internal/graph/connector.go b/service/internal/graph/connector.go index df780a20b4b..73ab6ede49e 100644 --- a/service/internal/graph/connector.go +++ b/service/internal/graph/connector.go @@ -13,10 +13,9 @@ import ( "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/pipeline/xpipeline" + "go.opentelemetry.io/collector/service/internal/attribute" "go.opentelemetry.io/collector/service/internal/builders" "go.opentelemetry.io/collector/service/internal/capabilityconsumer" - "go.opentelemetry.io/collector/service/internal/components" - "go.opentelemetry.io/collector/service/internal/graph/attribute" ) var _ consumerNode = (*connectorNode)(nil) @@ -51,7 +50,7 @@ func (n *connectorNode) buildComponent( builder *builders.ConnectorBuilder, nexts []baseConsumer, ) error { - tel.Logger = components.ConnectorLogger(tel.Logger, n.componentID, n.exprPipelineType, n.rcvrPipelineType) + tel.Logger = n.Attributes.Logger(tel.Logger) set := connector.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} switch n.rcvrPipelineType { case pipeline.SignalTraces: diff --git a/service/internal/graph/exporter.go b/service/internal/graph/exporter.go index 5598121c6cc..5b852f6b64d 100644 --- a/service/internal/graph/exporter.go +++ b/service/internal/graph/exporter.go @@ -11,9 +11,8 @@ import ( "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/pipeline/xpipeline" + "go.opentelemetry.io/collector/service/internal/attribute" "go.opentelemetry.io/collector/service/internal/builders" - "go.opentelemetry.io/collector/service/internal/components" - "go.opentelemetry.io/collector/service/internal/graph/attribute" ) var _ consumerNode = (*exporterNode)(nil) @@ -22,16 +21,24 @@ var _ consumerNode = (*exporterNode)(nil) // Therefore, nodeID is derived from "pipeline type" and "component ID". type exporterNode struct { *attribute.Attributes - componentID component.ID - pipelineType pipeline.Signal + componentID component.ID + pipelineTypes map[pipeline.Signal]struct{} component.Component } func newExporterNode(pipelineType pipeline.Signal, exprID component.ID) *exporterNode { return &exporterNode{ - Attributes: attribute.Exporter(pipelineType, exprID), - componentID: exprID, - pipelineType: pipelineType, + Attributes: attribute.Exporter(pipelineType, exprID), + componentID: exprID, + pipelineTypes: map[pipeline.Signal]struct{}{pipelineType: {}}, + } +} + +func newExporterSingletonNode(exprID component.ID) *exporterNode { + return &exporterNode{ + Attributes: attribute.ExporterSingleton(exprID), + componentID: exprID, + pipelineTypes: map[pipeline.Signal]struct{}{}, } } @@ -39,29 +46,35 @@ func (n *exporterNode) getConsumer() baseConsumer { return n.Component.(baseConsumer) } +func (n *exporterNode) withSignalType(pipelineType pipeline.Signal) { + n.pipelineTypes[pipelineType] = struct{}{} +} + func (n *exporterNode) buildComponent( ctx context.Context, tel component.TelemetrySettings, info component.BuildInfo, builder *builders.ExporterBuilder, ) error { - tel.Logger = components.ExporterLogger(tel.Logger, n.componentID, n.pipelineType) + tel.Logger = n.Attributes.Logger(tel.Logger) set := exporter.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} var err error - switch n.pipelineType { - case pipeline.SignalTraces: - n.Component, err = builder.CreateTraces(ctx, set) - case pipeline.SignalMetrics: - n.Component, err = builder.CreateMetrics(ctx, set) - case pipeline.SignalLogs: - n.Component, err = builder.CreateLogs(ctx, set) - case xpipeline.SignalProfiles: - n.Component, err = builder.CreateProfiles(ctx, set) - default: - return fmt.Errorf("error creating exporter %q for data type %q is not supported", set.ID, n.pipelineType) - } - if err != nil { - return fmt.Errorf("failed to create %q exporter for data type %q: %w", set.ID, n.pipelineType, err) + for signal := range n.pipelineTypes { + switch signal { + case pipeline.SignalTraces: + n.Component, err = builder.CreateTraces(ctx, set) + case pipeline.SignalMetrics: + n.Component, err = builder.CreateMetrics(ctx, set) + case pipeline.SignalLogs: + n.Component, err = builder.CreateLogs(ctx, set) + case xpipeline.SignalProfiles: + n.Component, err = builder.CreateProfiles(ctx, set) + default: + return fmt.Errorf("error creating exporter %q for data type %q is not supported", set.ID, signal) + } + if err != nil { + return fmt.Errorf("failed to create %q exporter for data type %q: %w", set.ID, signal, err) + } } return nil } diff --git a/service/internal/graph/fanout.go b/service/internal/graph/fanout.go index 7d1a76f086f..40aa7c01d29 100644 --- a/service/internal/graph/fanout.go +++ b/service/internal/graph/fanout.go @@ -5,7 +5,7 @@ package graph // import "go.opentelemetry.io/collector/service/internal/graph" import ( "go.opentelemetry.io/collector/pipeline" - "go.opentelemetry.io/collector/service/internal/graph/attribute" + "go.opentelemetry.io/collector/service/internal/attribute" ) var _ consumerNode = (*fanOutNode)(nil) diff --git a/service/internal/graph/graph.go b/service/internal/graph/graph.go index 3f5ae33fd91..08443e35201 100644 --- a/service/internal/graph/graph.go +++ b/service/internal/graph/graph.go @@ -111,13 +111,28 @@ func (g *Graph) createNodes(set Settings) error { connectorsAsReceiver[recvID] = append(connectorsAsReceiver[recvID], pipelineID) continue } - rcvrNode := g.createReceiver(pipelineID, recvID) + factory := set.ReceiverBuilder.Factory(recvID.Type()) + if factory == nil { + return fmt.Errorf("receiver factory not available for: %q", recvID.Type()) + } + var rcvrNode *receiverNode + if factory.Metadata().SingletonInstance { + rcvrNode = g.singletonReceiver(recvID) + rcvrNode.withSignalType(pipelineID.Signal()) + } else { + rcvrNode = g.createReceiver(pipelineID, recvID) + } pipe.receivers[rcvrNode.ID()] = rcvrNode } pipe.capabilitiesNode = newCapabilitiesNode(pipelineID) for _, procID := range pipelineCfg.Processors { + factory := set.ProcessorBuilder.Factory(procID.Type()) + if factory == nil { + return fmt.Errorf("processor factory not available for: %q", procID.Type()) + } + procNode := g.createProcessor(pipelineID, procID) pipe.processors = append(pipe.processors, procNode) } @@ -130,8 +145,19 @@ func (g *Graph) createNodes(set Settings) error { connectorsAsExporter[exprID] = append(connectorsAsExporter[exprID], pipelineID) continue } - expNode := g.createExporter(pipelineID, exprID) - pipe.exporters[expNode.ID()] = expNode + factory := set.ExporterBuilder.Factory(exprID.Type()) + if factory == nil { + return fmt.Errorf("exporter factory not available for: %q", exprID.Type()) + } + + var exprNode *exporterNode + if factory.Metadata().SingletonInstance { + exprNode = g.singletonExporter(exprID) + exprNode.withSignalType(pipelineID.Signal()) + } else { + exprNode = g.createExporter(pipelineID, exprID) + } + pipe.exporters[exprNode.ID()] = exprNode } } @@ -140,7 +166,6 @@ func (g *Graph) createNodes(set Settings) error { if factory == nil { return fmt.Errorf("connector factory not available for: %q", connID.Type()) } - connFactory := factory.(connector.Factory) expTypes := make(map[pipeline.Signal]bool) for _, pipelineID := range connectorsAsExporter[connID] { @@ -160,7 +185,7 @@ func (g *Graph) createNodes(set Settings) error { for expType := range expTypes { for recType := range recTypes { // Typechecks the connector's receiving and exporting datatypes. - if connectorStability(connFactory, expType, recType) != component.StabilityLevelUndefined { + if connectorStability(factory, expType, recType) != component.StabilityLevelUndefined { expTypes[expType] = true recTypes[recType] = true } @@ -182,12 +207,12 @@ func (g *Graph) createNodes(set Settings) error { for _, eID := range connectorsAsExporter[connID] { for _, rID := range connectorsAsReceiver[connID] { - if connectorStability(connFactory, eID.Signal(), rID.Signal()) == component.StabilityLevelUndefined { + if connectorStability(factory, eID.Signal(), rID.Signal()) == component.StabilityLevelUndefined { // Connector is not supported for this combination, but we know it is used correctly elsewhere continue } - connNode := g.createConnector(eID, rID, connID) + connNode := g.createConnector(eID, rID, connID) g.pipelines[eID].exporters[connNode.ID()] = connNode g.pipelines[rID].receivers[connNode.ID()] = connNode } @@ -196,6 +221,15 @@ func (g *Graph) createNodes(set Settings) error { return nil } +func (g *Graph) singletonReceiver(recvID component.ID) *receiverNode { + rcvrNode := newReceiverSingletonNode(recvID) + if node := g.componentGraph.Node(rcvrNode.ID()); node != nil { + return g.componentGraph.Node(rcvrNode.ID()).(*receiverNode) + } + g.componentGraph.AddNode(rcvrNode) + return rcvrNode +} + func (g *Graph) createReceiver(pipelineID pipeline.ID, recvID component.ID) *receiverNode { rcvrNode := newReceiverNode(pipelineID.Signal(), recvID) if node := g.componentGraph.Node(rcvrNode.ID()); node != nil { @@ -219,6 +253,15 @@ func (g *Graph) createProcessor(pipelineID pipeline.ID, procID component.ID) *pr return procNode } +func (g *Graph) singletonExporter(exprID component.ID) *exporterNode { + expNode := newExporterSingletonNode(exprID) + if node := g.componentGraph.Node(expNode.ID()); node != nil { + return g.componentGraph.Node(expNode.ID()).(*exporterNode) + } + g.componentGraph.AddNode(expNode) + return expNode +} + func (g *Graph) createExporter(pipelineID pipeline.ID, exprID component.ID) *exporterNode { expNode := newExporterNode(pipelineID.Signal(), exprID) if node := g.componentGraph.Node(expNode.ID()); node != nil { @@ -490,7 +533,9 @@ func (g *Graph) GetExporters() map[pipeline.Signal]map[component.ID]component.Co for _, expNode := range pg.exporters { // Skip connectors, otherwise individual components can introduce cycles if expNode, ok := g.componentGraph.Node(expNode.ID()).(*exporterNode); ok { - exportersMap[expNode.pipelineType][expNode.componentID] = expNode.Component + for signal := range expNode.pipelineTypes { + exportersMap[signal][expNode.componentID] = expNode.Component + } } } } @@ -529,7 +574,8 @@ func cycleErr(err error, cycles [][]graph.Node) error { case *processorNode: componentDetails = append(componentDetails, fmt.Sprintf("processor %q in pipeline %q", n.componentID, n.pipelineID.String())) case *connectorNode: - componentDetails = append(componentDetails, fmt.Sprintf("connector %q (%s to %s)", n.componentID, n.exprPipelineType, n.rcvrPipelineType)) + componentDetails = append(componentDetails, fmt.Sprintf("connector %q (%s to %s)", n.componentID, n.exprPipelineType, + n.rcvrPipelineType)) default: continue // skip capabilities/fanout nodes } diff --git a/service/internal/graph/graph_test.go b/service/internal/graph/graph_test.go index b5d245154dd..a96632ab096 100644 --- a/service/internal/graph/graph_test.go +++ b/service/internal/graph/graph_test.go @@ -1096,6 +1096,208 @@ func TestInstances(t *testing.T) { component.MustNewID("exampleexporter"): 4, // one per signal }, }, + { + name: "singleton_receiver", + pipelineConfigs: pipelines.Config{ + pipeline.NewIDWithName(pipeline.SignalTraces, "1"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalTraces, "2"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalMetrics, "1"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalMetrics, "2"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalLogs, "1"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalLogs, "2"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(xpipeline.SignalProfiles, "1"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(xpipeline.SignalProfiles, "2"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("exampleexporter")}, + }, + }, + expectInstances: map[component.ID]int{ + component.MustNewID("singletonreceiver"): 1, // singleton + component.MustNewID("exampleprocessor"): 8, // one per pipeline + component.MustNewID("exampleexporter"): 4, // each data type has an instance + }, + }, + { + name: "singleton_exporter", + pipelineConfigs: pipelines.Config{ + pipeline.NewIDWithName(pipeline.SignalTraces, "1"): { + Receivers: []component.ID{component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalTraces, "2"): { + Receivers: []component.ID{component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalMetrics, "1"): { + Receivers: []component.ID{component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalMetrics, "2"): { + Receivers: []component.ID{component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalLogs, "1"): { + Receivers: []component.ID{component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalLogs, "2"): { + Receivers: []component.ID{component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + pipeline.NewIDWithName(xpipeline.SignalProfiles, "1"): { + Receivers: []component.ID{component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + pipeline.NewIDWithName(xpipeline.SignalProfiles, "2"): { + Receivers: []component.ID{component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + }, + expectInstances: map[component.ID]int{ + component.MustNewID("examplereceiver"): 4, // each data type has an instance + component.MustNewID("exampleprocessor"): 8, // one per pipeline + component.MustNewID("singletonexporter"): 1, // singleton + }, + }, + { + name: "singleton_receiver_and_exporter", + pipelineConfigs: pipelines.Config{ + pipeline.NewIDWithName(pipeline.SignalTraces, "1"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalTraces, "2"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalMetrics, "1"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalMetrics, "2"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalLogs, "1"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalLogs, "2"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + pipeline.NewIDWithName(xpipeline.SignalProfiles, "1"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + pipeline.NewIDWithName(xpipeline.SignalProfiles, "2"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter")}, + }, + }, + expectInstances: map[component.ID]int{ + component.MustNewID("singletonreceiver"): 1, // singleton + component.MustNewID("exampleprocessor"): 8, // one per pipeline + component.MustNewID("singletonexporter"): 1, // singleton + }, + }, + { + name: "mixed", + pipelineConfigs: pipelines.Config{ + pipeline.NewIDWithName(pipeline.SignalTraces, "1"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver"), component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter"), component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalTraces, "2"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver"), component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter"), component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalMetrics, "1"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver"), component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter"), component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalMetrics, "2"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver"), component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter"), component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalLogs, "1"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver"), component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter"), component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(pipeline.SignalLogs, "2"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver"), component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter"), component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(xpipeline.SignalProfiles, "1"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver"), component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter"), component.MustNewID("exampleexporter")}, + }, + pipeline.NewIDWithName(xpipeline.SignalProfiles, "2"): { + Receivers: []component.ID{component.MustNewID("singletonreceiver"), component.MustNewID("examplereceiver")}, + Processors: []component.ID{component.MustNewID("exampleprocessor")}, + Exporters: []component.ID{component.MustNewID("singletonexporter"), component.MustNewID("exampleexporter")}, + }, + }, + expectInstances: map[component.ID]int{ + component.MustNewID("singletonreceiver"): 1, // singleton + component.MustNewID("examplereceiver"): 4, // one per signal + component.MustNewID("exampleprocessor"): 8, // one per pipeline + component.MustNewID("singletonexporter"): 1, // singleton + component.MustNewID("exampleexporter"): 4, // one per signal + }, + }, } for _, tt := range tests { @@ -1105,10 +1307,12 @@ func TestInstances(t *testing.T) { BuildInfo: component.NewDefaultBuildInfo(), ReceiverBuilder: builders.NewReceiver( map[component.ID]component.Config{ - component.MustNewID("examplereceiver"): testcomponents.ExampleReceiverFactory.CreateDefaultConfig(), + component.MustNewID("examplereceiver"): testcomponents.ExampleReceiverFactory.CreateDefaultConfig(), + component.MustNewID("singletonreceiver"): testcomponents.SingletonReceiverFactory.CreateDefaultConfig(), }, map[component.Type]receiver.Factory{ - testcomponents.ExampleReceiverFactory.Type(): testcomponents.ExampleReceiverFactory, + testcomponents.ExampleReceiverFactory.Type(): testcomponents.ExampleReceiverFactory, + testcomponents.SingletonReceiverFactory.Type(): testcomponents.SingletonReceiverFactory, }, ), ProcessorBuilder: builders.NewProcessor( @@ -1121,10 +1325,12 @@ func TestInstances(t *testing.T) { ), ExporterBuilder: builders.NewExporter( map[component.ID]component.Config{ - component.MustNewID("exampleexporter"): testcomponents.ExampleExporterFactory.CreateDefaultConfig(), + component.MustNewID("exampleexporter"): testcomponents.ExampleExporterFactory.CreateDefaultConfig(), + component.MustNewID("singletonexporter"): testcomponents.SingletonExporterFactory.CreateDefaultConfig(), }, map[component.Type]exporter.Factory{ - testcomponents.ExampleExporterFactory.Type(): testcomponents.ExampleExporterFactory, + testcomponents.ExampleExporterFactory.Type(): testcomponents.ExampleExporterFactory, + testcomponents.SingletonExporterFactory.Type(): testcomponents.SingletonExporterFactory, }, ), ConnectorBuilder: builders.NewConnector(map[component.ID]component.Config{}, map[component.Type]connector.Factory{}), @@ -2321,11 +2527,11 @@ func TestGraphBuildErrors(t *testing.T) { }, }, expected: `cycle detected: ` + - `connector "nop/conn" (logs to logs) -> ` + - `processor "nop" in pipeline "logs/1" -> ` + `connector "nop/conn1" (logs to logs) -> ` + `processor "nop" in pipeline "logs/2" -> ` + - `connector "nop/conn" (logs to logs)`, + `connector "nop/conn" (logs to logs) -> ` + + `processor "nop" in pipeline "logs/1" -> ` + + `connector "nop/conn1" (logs to logs)`, }, { name: "not_allowed_deep_cycle_profiles.yaml", @@ -2427,13 +2633,13 @@ func TestGraphBuildErrors(t *testing.T) { }, }, expected: `cycle detected: ` + - `connector "nop/forkagain" (traces to traces) -> ` + - `processor "nop" in pipeline "traces/copy2b" -> ` + `connector "nop/rawlog" (traces to logs) -> ` + `processor "nop" in pipeline "logs/raw" -> ` + `connector "nop/fork" (logs to traces) -> ` + `processor "nop" in pipeline "traces/copy2" -> ` + - `connector "nop/forkagain" (traces to traces)`, + `connector "nop/forkagain" (traces to traces) -> ` + + `processor "nop" in pipeline "traces/copy2b" -> ` + + `connector "nop/rawlog" (traces to logs)`, }, { name: "unknown_exporter_config", @@ -2465,7 +2671,7 @@ func TestGraphBuildErrors(t *testing.T) { Exporters: []component.ID{component.MustNewID("unknown")}, }, }, - expected: "failed to create \"unknown\" exporter for data type \"traces\": exporter factory not available for: \"unknown\"", + expected: "exporter factory not available for: \"unknown\"", }, { name: "unknown_processor_config", @@ -2505,7 +2711,7 @@ func TestGraphBuildErrors(t *testing.T) { Exporters: []component.ID{component.MustNewID("nop")}, }, }, - expected: "failed to create \"unknown\" processor, in pipeline \"metrics\": processor factory not available for: \"unknown\"", + expected: "processor factory not available for: \"unknown\"", }, { name: "unknown_receiver_config", @@ -2537,7 +2743,7 @@ func TestGraphBuildErrors(t *testing.T) { Exporters: []component.ID{component.MustNewID("nop")}, }, }, - expected: "failed to create \"unknown\" receiver for data type \"logs\": receiver factory not available for: \"unknown\"", + expected: "receiver factory not available for: \"unknown\"", }, { name: "unknown_connector_factory", diff --git a/service/internal/graph/processor.go b/service/internal/graph/processor.go index cfe36cb461c..77af19e8d6c 100644 --- a/service/internal/graph/processor.go +++ b/service/internal/graph/processor.go @@ -13,9 +13,8 @@ import ( "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/pipeline/xpipeline" "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/service/internal/attribute" "go.opentelemetry.io/collector/service/internal/builders" - "go.opentelemetry.io/collector/service/internal/components" - "go.opentelemetry.io/collector/service/internal/graph/attribute" ) var _ consumerNode = (*processorNode)(nil) @@ -47,7 +46,7 @@ func (n *processorNode) buildComponent(ctx context.Context, builder *builders.ProcessorBuilder, next baseConsumer, ) error { - tel.Logger = components.ProcessorLogger(tel.Logger, n.componentID, n.pipelineID) + tel.Logger = n.Attributes.Logger(tel.Logger) set := processor.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} var err error switch n.pipelineID.Signal() { diff --git a/service/internal/graph/receiver.go b/service/internal/graph/receiver.go index 57dfcded117..19a40e52b07 100644 --- a/service/internal/graph/receiver.go +++ b/service/internal/graph/receiver.go @@ -14,67 +14,80 @@ import ( "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/pipeline/xpipeline" "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/service/internal/attribute" "go.opentelemetry.io/collector/service/internal/builders" - "go.opentelemetry.io/collector/service/internal/components" - "go.opentelemetry.io/collector/service/internal/graph/attribute" ) // A receiver instance can be shared by multiple pipelines of the same type. // Therefore, nodeID is derived from "pipeline type" and "component ID". type receiverNode struct { *attribute.Attributes - componentID component.ID - pipelineType pipeline.Signal + componentID component.ID + pipelineTypes map[pipeline.Signal]struct{} component.Component } func newReceiverNode(pipelineType pipeline.Signal, recvID component.ID) *receiverNode { return &receiverNode{ - Attributes: attribute.Receiver(pipelineType, recvID), - componentID: recvID, - pipelineType: pipelineType, + Attributes: attribute.Receiver(pipelineType, recvID), + componentID: recvID, + pipelineTypes: map[pipeline.Signal]struct{}{pipelineType: {}}, } } +func newReceiverSingletonNode(recvID component.ID) *receiverNode { + return &receiverNode{ + Attributes: attribute.ReceiverSingleton(recvID), + componentID: recvID, + pipelineTypes: map[pipeline.Signal]struct{}{}, + } +} + +func (n *receiverNode) withSignalType(pipelineType pipeline.Signal) { + n.pipelineTypes[pipelineType] = struct{}{} +} + func (n *receiverNode) buildComponent(ctx context.Context, tel component.TelemetrySettings, info component.BuildInfo, builder *builders.ReceiverBuilder, nexts []baseConsumer, ) error { - tel.Logger = components.ReceiverLogger(tel.Logger, n.componentID, n.pipelineType) + tel.Logger = n.Attributes.Logger(tel.Logger) set := receiver.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} var err error - switch n.pipelineType { - case pipeline.SignalTraces: - var consumers []consumer.Traces - for _, next := range nexts { - consumers = append(consumers, next.(consumer.Traces)) - } - n.Component, err = builder.CreateTraces(ctx, set, fanoutconsumer.NewTraces(consumers)) - case pipeline.SignalMetrics: - var consumers []consumer.Metrics - for _, next := range nexts { - consumers = append(consumers, next.(consumer.Metrics)) + for signal := range n.pipelineTypes { + switch signal { + case pipeline.SignalTraces: + var consumers []consumer.Traces + for _, next := range nexts { + consumers = append(consumers, next.(consumer.Traces)) + } + n.Component, err = builder.CreateTraces(ctx, set, fanoutconsumer.NewTraces(consumers)) + case pipeline.SignalMetrics: + var consumers []consumer.Metrics + for _, next := range nexts { + consumers = append(consumers, next.(consumer.Metrics)) + } + n.Component, err = builder.CreateMetrics(ctx, set, fanoutconsumer.NewMetrics(consumers)) + case pipeline.SignalLogs: + var consumers []consumer.Logs + for _, next := range nexts { + consumers = append(consumers, next.(consumer.Logs)) + } + n.Component, err = builder.CreateLogs(ctx, set, fanoutconsumer.NewLogs(consumers)) + case xpipeline.SignalProfiles: + var consumers []xconsumer.Profiles + for _, next := range nexts { + consumers = append(consumers, next.(xconsumer.Profiles)) + } + n.Component, err = builder.CreateProfiles(ctx, set, fanoutconsumer.NewProfiles(consumers)) + default: + return fmt.Errorf("error creating receiver %q for data type %q is not supported", set.ID, signal) } - n.Component, err = builder.CreateMetrics(ctx, set, fanoutconsumer.NewMetrics(consumers)) - case pipeline.SignalLogs: - var consumers []consumer.Logs - for _, next := range nexts { - consumers = append(consumers, next.(consumer.Logs)) + if err != nil { + return fmt.Errorf("failed to create %q receiver for data type %q: %w", set.ID, signal, err) } - n.Component, err = builder.CreateLogs(ctx, set, fanoutconsumer.NewLogs(consumers)) - case xpipeline.SignalProfiles: - var consumers []xconsumer.Profiles - for _, next := range nexts { - consumers = append(consumers, next.(xconsumer.Profiles)) - } - n.Component, err = builder.CreateProfiles(ctx, set, fanoutconsumer.NewProfiles(consumers)) - default: - return fmt.Errorf("error creating receiver %q for data type %q is not supported", set.ID, n.pipelineType) - } - if err != nil { - return fmt.Errorf("failed to create %q receiver for data type %q: %w", set.ID, n.pipelineType, err) } return nil } diff --git a/service/internal/graph/util_test.go b/service/internal/graph/util_test.go index 5e27bf6f340..84d8abc70e7 100644 --- a/service/internal/graph/util_test.go +++ b/service/internal/graph/util_test.go @@ -91,7 +91,9 @@ func (g *Graph) getReceivers() map[pipeline.Signal]map[component.ID]component.Co if !ok { continue } - receiversMap[rcvrNode.pipelineType][rcvrNode.componentID] = rcvrNode.Component + for signal := range rcvrNode.pipelineTypes { + receiversMap[signal][rcvrNode.componentID] = rcvrNode.Component + } } } return receiversMap diff --git a/service/internal/testcomponents/example_receiver.go b/service/internal/testcomponents/example_receiver.go index 94501183bda..d8fff8af3fa 100644 --- a/service/internal/testcomponents/example_receiver.go +++ b/service/internal/testcomponents/example_receiver.go @@ -33,63 +33,40 @@ func createReceiverDefaultConfig() component.Config { func createTracesReceiver( _ context.Context, _ receiver.Settings, - cfg component.Config, + _ component.Config, nextConsumer consumer.Traces, ) (receiver.Traces, error) { - tr := createReceiver(cfg) - tr.ConsumeTracesFunc = nextConsumer.ConsumeTraces - return tr, nil + return &ExampleReceiver{ConsumeTracesFunc: nextConsumer.ConsumeTraces}, nil } // createMetrics creates a receiver.Metrics based on this config. func createMetricsReceiver( _ context.Context, _ receiver.Settings, - cfg component.Config, + _ component.Config, nextConsumer consumer.Metrics, ) (receiver.Metrics, error) { - mr := createReceiver(cfg) - mr.ConsumeMetricsFunc = nextConsumer.ConsumeMetrics - return mr, nil + return &ExampleReceiver{ConsumeMetricsFunc: nextConsumer.ConsumeMetrics}, nil } // createLogs creates a receiver.Logs based on this config. func createLogsReceiver( _ context.Context, _ receiver.Settings, - cfg component.Config, + _ component.Config, nextConsumer consumer.Logs, ) (receiver.Logs, error) { - lr := createReceiver(cfg) - lr.ConsumeLogsFunc = nextConsumer.ConsumeLogs - return lr, nil + return &ExampleReceiver{ConsumeLogsFunc: nextConsumer.ConsumeLogs}, nil } // createProfiles creates a receiver.Profiles based on this config. func createProfilesReceiver( _ context.Context, _ receiver.Settings, - cfg component.Config, + _ component.Config, nextConsumer xconsumer.Profiles, ) (xreceiver.Profiles, error) { - tr := createReceiver(cfg) - tr.ConsumeProfilesFunc = nextConsumer.ConsumeProfiles - return tr, nil -} - -func createReceiver(cfg component.Config) *ExampleReceiver { - // There must be one receiver for all data types. We maintain a map of - // receivers per config. - - // Check to see if there is already a receiver for this config. - er, ok := exampleReceivers[cfg] - if !ok { - er = &ExampleReceiver{} - // Remember the receiver in the map - exampleReceivers[cfg] = er - } - - return er + return &ExampleReceiver{ConsumeProfilesFunc: nextConsumer.ConsumeProfiles}, nil } // ExampleReceiver allows producing traces, metrics, logs and profiles for testing purposes. @@ -100,9 +77,3 @@ type ExampleReceiver struct { consumer.ConsumeLogsFunc xconsumer.ConsumeProfilesFunc } - -// This is the map of already created example receivers for particular configurations. -// We maintain this map because the receiver.Factory is asked trace and metric receivers separately -// when it gets CreateTraces() and CreateMetrics() but they must not -// create separate objects, they must use one Receiver object per configuration. -var exampleReceivers = map[component.Config]*ExampleReceiver{} diff --git a/service/internal/testcomponents/singleton_exporter.go b/service/internal/testcomponents/singleton_exporter.go new file mode 100644 index 00000000000..0c98a58c0fc --- /dev/null +++ b/service/internal/testcomponents/singleton_exporter.go @@ -0,0 +1,124 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package testcomponents // import "go.opentelemetry.io/collector/service/internal/testcomponents" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/xexporter" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pprofile" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +var singletonExporterType = component.MustNewType("singletonexporter") + +// SingletonExporterFactory is factory for SingletonExporter. +var SingletonExporterFactory = xexporter.NewFactory( + singletonExporterType, + createSingletonExporterDefaultConfig, + xexporter.WithTraces(createSingletonTracesExporter, component.StabilityLevelDevelopment), + xexporter.WithMetrics(createSingletonMetricsExporter, component.StabilityLevelDevelopment), + xexporter.WithLogs(createSingletonLogsExporter, component.StabilityLevelDevelopment), + xexporter.WithProfiles(createSingletonProfilesExporter, component.StabilityLevelDevelopment), + xexporter.AsSingletonInstance(), +) + +func createSingletonExporterDefaultConfig() component.Config { + return &struct{}{} +} + +func createSingletonTracesExporter( + _ context.Context, + _ exporter.Settings, + cfg component.Config, +) (exporter.Traces, error) { + return createSingletonExporter(cfg), nil +} + +func createSingletonMetricsExporter( + _ context.Context, + _ exporter.Settings, + cfg component.Config, +) (exporter.Metrics, error) { + return createSingletonExporter(cfg), nil +} + +func createSingletonLogsExporter( + _ context.Context, + _ exporter.Settings, + cfg component.Config, +) (exporter.Logs, error) { + return createSingletonExporter(cfg), nil +} + +func createSingletonProfilesExporter( + _ context.Context, + _ exporter.Settings, + cfg component.Config, +) (xexporter.Profiles, error) { + return createSingletonExporter(cfg), nil +} + +func createSingletonExporter(cfg component.Config) *SingletonExporter { + // There must be one receiver for all data types. We maintain a map of + // receivers per config. + + // Check to see if there is already a receiver for this config. + sr, ok := singletonExporters[cfg] + if !ok { + sr = &SingletonExporter{} + // Remember the receiver in the map + singletonExporters[cfg] = sr + } + + return sr +} + +// SingletonExporter stores consumed traces, metrics, logs and profiles for testing purposes. +type SingletonExporter struct { + componentState + Traces []ptrace.Traces + Metrics []pmetric.Metrics + Logs []plog.Logs + Profiles []pprofile.Profiles +} + +// ConsumeTraces receives ptrace.Traces for processing by the consumer.Traces. +func (exp *SingletonExporter) ConsumeTraces(_ context.Context, td ptrace.Traces) error { + exp.Traces = append(exp.Traces, td) + return nil +} + +// ConsumeMetrics receives pmetric.Metrics for processing by the Metrics. +func (exp *SingletonExporter) ConsumeMetrics(_ context.Context, md pmetric.Metrics) error { + exp.Metrics = append(exp.Metrics, md) + return nil +} + +// ConsumeLogs receives plog.Logs for processing by the Logs. +func (exp *SingletonExporter) ConsumeLogs(_ context.Context, ld plog.Logs) error { + exp.Logs = append(exp.Logs, ld) + return nil +} + +// ConsumeProfiles receives pprofile.Profiles for processing by the xconsumer.Profiles. +func (exp *SingletonExporter) ConsumeProfiles(_ context.Context, td pprofile.Profiles) error { + exp.Profiles = append(exp.Profiles, td) + return nil +} + +func (exp *SingletonExporter) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +// This is the map of already created singleton exporters for particular configurations. +// We maintain this map because the receiver.Factory is asked trace and metric exporters separately +// when it gets CreateTraces() and CreateMetrics() but they must not +// create separate objects, they must use one Receiver object per configuration. +var singletonExporters = map[component.Config]*SingletonExporter{} diff --git a/service/internal/testcomponents/singleton_exporter_test.go b/service/internal/testcomponents/singleton_exporter_test.go new file mode 100644 index 00000000000..e192d6d0654 --- /dev/null +++ b/service/internal/testcomponents/singleton_exporter_test.go @@ -0,0 +1,46 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package testcomponents + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pprofile" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +func TestSingletonExporter(t *testing.T) { + exp := &SingletonExporter{} + host := componenttest.NewNopHost() + assert.False(t, exp.Started()) + require.NoError(t, exp.Start(context.Background(), host)) + assert.True(t, exp.Started()) + + assert.Empty(t, exp.Traces) + require.NoError(t, exp.ConsumeTraces(context.Background(), ptrace.Traces{})) + assert.Len(t, exp.Traces, 1) + + assert.Empty(t, exp.Metrics) + require.NoError(t, exp.ConsumeMetrics(context.Background(), pmetric.Metrics{})) + assert.Len(t, exp.Metrics, 1) + + assert.Empty(t, exp.Logs) + require.NoError(t, exp.ConsumeLogs(context.Background(), plog.Logs{})) + assert.Len(t, exp.Logs, 1) + + assert.Empty(t, exp.Profiles) + require.NoError(t, exp.ConsumeProfiles(context.Background(), pprofile.Profiles{})) + assert.Len(t, exp.Profiles, 1) + + assert.False(t, exp.Stopped()) + require.NoError(t, exp.Shutdown(context.Background())) + assert.True(t, exp.Stopped()) +} diff --git a/service/internal/testcomponents/singleton_receiver.go b/service/internal/testcomponents/singleton_receiver.go new file mode 100644 index 00000000000..f8f1afcb561 --- /dev/null +++ b/service/internal/testcomponents/singleton_receiver.go @@ -0,0 +1,105 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package testcomponents // import "go.opentelemetry.io/collector/service/internal/testcomponents" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/xconsumer" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/xreceiver" +) + +var singletonReceiverType = component.MustNewType("singletonreceiver") + +// SingletonReceiverFactory is factory for SingletonReceiver. +var SingletonReceiverFactory = xreceiver.NewFactory( + singletonReceiverType, + createSingletonReceiverDefaultConfig, + xreceiver.WithTraces(createSingletonTracesReceiver, component.StabilityLevelDevelopment), + xreceiver.WithMetrics(createSingletonMetricsReceiver, component.StabilityLevelDevelopment), + xreceiver.WithLogs(createSingletonLogsReceiver, component.StabilityLevelDevelopment), + xreceiver.WithProfiles(createSingletonProfilesReceiver, component.StabilityLevelDevelopment), + xreceiver.AsSingletonInstance(), +) + +func createSingletonReceiverDefaultConfig() component.Config { + return &struct{}{} +} + +func createSingletonTracesReceiver( + _ context.Context, + _ receiver.Settings, + cfg component.Config, + nextConsumer consumer.Traces, +) (receiver.Traces, error) { + r := createSingletonReceiver(cfg) + r.ConsumeTracesFunc = nextConsumer.ConsumeTraces + return r, nil +} + +func createSingletonMetricsReceiver( + _ context.Context, + _ receiver.Settings, + cfg component.Config, + nextConsumer consumer.Metrics, +) (receiver.Metrics, error) { + r := createSingletonReceiver(cfg) + r.ConsumeMetricsFunc = nextConsumer.ConsumeMetrics + return r, nil +} + +func createSingletonLogsReceiver( + _ context.Context, + _ receiver.Settings, + cfg component.Config, + nextConsumer consumer.Logs, +) (receiver.Logs, error) { + r := createSingletonReceiver(cfg) + r.ConsumeLogsFunc = nextConsumer.ConsumeLogs + return r, nil +} + +func createSingletonProfilesReceiver( + _ context.Context, + _ receiver.Settings, + cfg component.Config, + nextConsumer xconsumer.Profiles, +) (xreceiver.Profiles, error) { + r := createSingletonReceiver(cfg) + r.ConsumeProfilesFunc = nextConsumer.ConsumeProfiles + return r, nil +} + +func createSingletonReceiver(cfg component.Config) *SingletonReceiver { + // There must be one receiver for all data types. We maintain a map of + // receivers per config. + + // Check to see if there is already a receiver for this config. + sr, ok := singletonReceivers[cfg] + if !ok { + sr = &SingletonReceiver{} + // Remember the receiver in the map + singletonReceivers[cfg] = sr + } + + return sr +} + +// SingletonReceiver allows producing traces, metrics, logs and profiles for testing purposes. +type SingletonReceiver struct { + componentState + consumer.ConsumeTracesFunc + consumer.ConsumeMetricsFunc + consumer.ConsumeLogsFunc + xconsumer.ConsumeProfilesFunc +} + +// This is the map of already created singleton receivers for particular configurations. +// We maintain this map because the receiver.Factory is asked trace and metric receivers separately +// when it gets CreateTraces() and CreateMetrics() but they must not +// create separate objects, they must use one Receiver object per configuration. +var singletonReceivers = map[component.Config]*SingletonReceiver{} diff --git a/service/internal/testcomponents/singleton_receiver_test.go b/service/internal/testcomponents/singleton_receiver_test.go new file mode 100644 index 00000000000..38f3366a8e0 --- /dev/null +++ b/service/internal/testcomponents/singleton_receiver_test.go @@ -0,0 +1,26 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package testcomponents + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component/componenttest" +) + +func TestSingletonReceiver(t *testing.T) { + rcv := &SingletonReceiver{} + host := componenttest.NewNopHost() + assert.False(t, rcv.Started()) + require.NoError(t, rcv.Start(context.Background(), host)) + assert.True(t, rcv.Started()) + + assert.False(t, rcv.Stopped()) + require.NoError(t, rcv.Shutdown(context.Background())) + assert.True(t, rcv.Stopped()) +}