From 0717a5d0d77fb3d33710ef59b844c2ad0137c991 Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Wed, 8 Jan 2025 13:36:35 -0600 Subject: [PATCH] Use attributes in loggers --- service/extensions/extensions.go | 6 +- .../{graph => }/attribute/attribute.go | 55 +++- service/internal/attribute/attribute_test.go | 241 ++++++++++++++++++ service/internal/builders/connector.go | 2 +- service/internal/builders/exporter.go | 2 +- service/internal/builders/processor.go | 2 +- service/internal/builders/receiver.go | 2 +- service/internal/components/loggers.go | 58 ----- service/internal/components/package_test.go | 14 - .../graph/attribute/attribute_test.go | 104 -------- service/internal/graph/capabilities.go | 2 +- service/internal/graph/connector.go | 15 +- service/internal/graph/exporter.go | 15 +- service/internal/graph/fanout.go | 2 +- service/internal/graph/graph.go | 38 ++- service/internal/graph/graph_test.go | 18 +- service/internal/graph/processor.go | 5 +- service/internal/graph/receiver.go | 15 +- 18 files changed, 361 insertions(+), 235 deletions(-) rename service/internal/{graph => }/attribute/attribute.go (60%) create mode 100644 service/internal/attribute/attribute_test.go delete mode 100644 service/internal/components/loggers.go delete mode 100644 service/internal/components/package_test.go delete mode 100644 service/internal/graph/attribute/attribute_test.go diff --git a/service/extensions/extensions.go b/service/extensions/extensions.go index fcefcb33f201..dd5204ac1824 100644 --- a/service/extensions/extensions.go +++ b/service/extensions/extensions.go @@ -17,8 +17,8 @@ import ( "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/extension/extensioncapabilities" + "go.opentelemetry.io/collector/service/internal/attribute" "go.opentelemetry.io/collector/service/internal/builders" - "go.opentelemetry.io/collector/service/internal/components" "go.opentelemetry.io/collector/service/internal/status" "go.opentelemetry.io/collector/service/internal/zpages" ) @@ -38,7 +38,7 @@ type Extensions struct { func (bes *Extensions) Start(ctx context.Context, host component.Host) error { bes.telemetry.Logger.Info("Starting extensions...") for _, extID := range bes.extensionIDs { - extLogger := components.ExtensionLogger(bes.telemetry.Logger, extID) + extLogger := attribute.Extension(extID).Logger(bes.telemetry.Logger) extLogger.Info("Extension is starting...") instanceID := bes.instanceIDs[extID] ext := bes.extMap[extID] @@ -216,7 +216,7 @@ func New(ctx context.Context, set Settings, cfg Config, options ...Option) (*Ext BuildInfo: set.BuildInfo, ModuleInfo: set.ModuleInfo, } - extSet.TelemetrySettings.Logger = components.ExtensionLogger(set.Telemetry.Logger, extID) + extSet.TelemetrySettings.Logger = attribute.Extension(extID).Logger(set.Telemetry.Logger) ext, err := set.Extensions.Create(ctx, extSet) if err != nil { diff --git a/service/internal/graph/attribute/attribute.go b/service/internal/attribute/attribute.go similarity index 60% rename from service/internal/graph/attribute/attribute.go rename to service/internal/attribute/attribute.go index e81b1f9be23b..949ee6046b3f 100644 --- a/service/internal/graph/attribute/attribute.go +++ b/service/internal/attribute/attribute.go @@ -1,13 +1,13 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package attribute // import "go.opentelemetry.io/collector/service/internal/graph/attribute" +package attribute // import "go.opentelemetry.io/collector/service/internal/attribute" import ( - "fmt" "hash/fnv" "go.opentelemetry.io/otel/attribute" + "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pipeline" @@ -20,10 +20,6 @@ const ( signalKey = "otelcol.signal" signalOutputKey = "otelcol.signal.output" - receiverKind = "receiver" - processorKind = "processor" - exporterKind = "exporter" - connectorKind = "connector" capabiltiesKind = "capabilities" fanoutKind = "fanout" ) @@ -52,17 +48,32 @@ func (a Attributes) ID() int64 { return a.id } +func (a Attributes) Logger(logger *zap.Logger) *zap.Logger { + fields := make([]zap.Field, 0, a.set.Len()) + for _, kv := range a.set.ToSlice() { + fields = append(fields, zap.String(string(kv.Key), kv.Value.AsString())) + } + return logger.With(fields...) +} + func Receiver(pipelineType pipeline.Signal, id component.ID) *Attributes { return newAttributes( - attribute.String(componentKindKey, receiverKind), + attribute.String(componentKindKey, component.KindReceiver.String()), attribute.String(signalKey, pipelineType.String()), attribute.String(componentIDKey, id.String()), ) } +func ReceiverSingleton(id component.ID) *Attributes { + return newAttributes( + attribute.String(componentKindKey, component.KindReceiver.String()), + attribute.String(componentIDKey, id.String()), + ) +} + func Processor(pipelineID pipeline.ID, id component.ID) *Attributes { return newAttributes( - attribute.String(componentKindKey, processorKind), + attribute.String(componentKindKey, component.KindProcessor.String()), attribute.String(signalKey, pipelineID.Signal().String()), attribute.String(pipelineIDKey, pipelineID.String()), attribute.String(componentIDKey, id.String()), @@ -71,16 +82,31 @@ func Processor(pipelineID pipeline.ID, id component.ID) *Attributes { func Exporter(pipelineType pipeline.Signal, id component.ID) *Attributes { return newAttributes( - attribute.String(componentKindKey, exporterKind), + attribute.String(componentKindKey, component.KindExporter.String()), attribute.String(signalKey, pipelineType.String()), attribute.String(componentIDKey, id.String()), ) } +func ExporterSingleton(id component.ID) *Attributes { + return newAttributes( + attribute.String(componentKindKey, component.KindExporter.String()), + attribute.String(componentIDKey, id.String()), + ) +} + func Connector(exprPipelineType, rcvrPipelineType pipeline.Signal, id component.ID) *Attributes { return newAttributes( - attribute.String(componentKindKey, connectorKind), - attribute.String(signalKey, fmt.Sprintf("%s_to_%s", exprPipelineType.String(), rcvrPipelineType.String())), + attribute.String(componentKindKey, component.KindConnector.String()), + attribute.String(signalKey, exprPipelineType.String()), + attribute.String(signalOutputKey, rcvrPipelineType.String()), + attribute.String(componentIDKey, id.String()), + ) +} + +func ConnectorSingleton(id component.ID) *Attributes { + return newAttributes( + attribute.String(componentKindKey, component.KindConnector.String()), attribute.String(componentIDKey, id.String()), ) } @@ -98,3 +124,10 @@ func Fanout(pipelineID pipeline.ID) *Attributes { attribute.String(pipelineIDKey, pipelineID.String()), ) } + +func Extension(id component.ID) *Attributes { + return newAttributes( + attribute.String(componentKindKey, component.KindExtension.String()), + attribute.String(componentIDKey, id.String()), + ) +} diff --git a/service/internal/attribute/attribute_test.go b/service/internal/attribute/attribute_test.go new file mode 100644 index 000000000000..791bd200e818 --- /dev/null +++ b/service/internal/attribute/attribute_test.go @@ -0,0 +1,241 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package attribute + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/pipeline/xpipeline" +) + +var ( + signals = []pipeline.Signal{ + pipeline.SignalTraces, + pipeline.SignalMetrics, + pipeline.SignalLogs, + xpipeline.SignalProfiles, + } + + cIDs = []component.ID{ + component.MustNewID("foo"), + component.MustNewID("foo2"), + component.MustNewID("bar"), + } + + pIDs = []pipeline.ID{ + pipeline.MustNewID("traces"), + pipeline.MustNewIDWithName("traces", "2"), + pipeline.MustNewID("metrics"), + pipeline.MustNewIDWithName("metrics", "2"), + pipeline.MustNewID("logs"), + pipeline.MustNewIDWithName("logs", "2"), + pipeline.MustNewID("profiles"), + pipeline.MustNewIDWithName("profiles", "2"), + } +) + +func TestReceiver(t *testing.T) { + for _, sig := range signals { + for _, id := range cIDs { + r := Receiver(sig, id) + componentKind, ok := r.Attributes().Value(componentKindKey) + require.True(t, ok) + require.Equal(t, component.KindReceiver, componentKind.AsString()) + + signal, ok := r.Attributes().Value(signalKey) + require.True(t, ok) + require.Equal(t, sig.String(), signal.AsString()) + + componentID, ok := r.Attributes().Value(componentIDKey) + require.True(t, ok) + require.Equal(t, id.String(), componentID.AsString()) + } + } +} + +func TestReceiverSingleton(t *testing.T) { + for _, id := range cIDs { + r := ReceiverSingleton(id) + componentKind, ok := r.Attributes().Value(componentKindKey) + require.True(t, ok) + require.Equal(t, component.KindReceiver, componentKind.AsString()) + + componentID, ok := r.Attributes().Value(componentIDKey) + require.True(t, ok) + require.Equal(t, id.String(), componentID.AsString()) + } +} + +func TestProcessor(t *testing.T) { + for _, pID := range pIDs { + for _, id := range cIDs { + p := Processor(pID, id) + componentKind, ok := p.Attributes().Value(componentKindKey) + require.True(t, ok) + require.Equal(t, component.KindProcessor, componentKind.AsString()) + + pipelineID, ok := p.Attributes().Value(pipelineIDKey) + require.True(t, ok) + require.Equal(t, pID.String(), pipelineID.AsString()) + + componentID, ok := p.Attributes().Value(componentIDKey) + require.True(t, ok) + require.Equal(t, id.String(), componentID.AsString()) + } + } +} + +func TestExporter(t *testing.T) { + for _, sig := range signals { + for _, id := range cIDs { + e := Exporter(sig, id) + componentKind, ok := e.Attributes().Value(componentKindKey) + require.True(t, ok) + require.Equal(t, component.KindExporter, componentKind.AsString()) + + signal, ok := e.Attributes().Value(signalKey) + require.True(t, ok) + require.Equal(t, sig.String(), signal.AsString()) + + componentID, ok := e.Attributes().Value(componentIDKey) + require.True(t, ok) + require.Equal(t, id.String(), componentID.AsString()) + } + } +} + +func TestExporterSingleton(t *testing.T) { + for _, id := range cIDs { + e := ExporterSingleton(id) + componentKind, ok := e.Attributes().Value(componentKindKey) + require.True(t, ok) + require.Equal(t, component.KindExporter, componentKind.AsString()) + + componentID, ok := e.Attributes().Value(componentIDKey) + require.True(t, ok) + require.Equal(t, id.String(), componentID.AsString()) + } +} + +func TestConnector(t *testing.T) { + for _, exprSig := range signals { + for _, rcvrSig := range signals { + for _, id := range cIDs { + c := Connector(exprSig, rcvrSig, id) + componentKind, ok := c.Attributes().Value(componentKindKey) + require.True(t, ok) + require.Equal(t, component.KindConnector, componentKind.AsString()) + + signal, ok := c.Attributes().Value(signalKey) + require.True(t, ok) + require.Equal(t, exprSig.String(), signal.AsString()) + + signalOutput, ok := c.Attributes().Value(signalOutputKey) + require.True(t, ok) + require.Equal(t, rcvrSig.String(), signalOutput.AsString()) + + componentID, ok := c.Attributes().Value(componentIDKey) + require.True(t, ok) + require.Equal(t, id.String(), componentID.AsString()) + } + } + } +} + +func TestConnectorSingleton(t *testing.T) { + for _, id := range cIDs { + c := ConnectorSingleton(id) + componentKind, ok := c.Attributes().Value(componentKindKey) + require.True(t, ok) + require.Equal(t, component.KindConnector, componentKind.AsString()) + + componentID, ok := c.Attributes().Value(componentIDKey) + require.True(t, ok) + require.Equal(t, id.String(), componentID.AsString()) + } +} + +func TestExtension(t *testing.T) { + e := Extension(component.MustNewID("foo")) + componentKind, ok := e.Attributes().Value(componentKindKey) + require.True(t, ok) + require.Equal(t, component.KindExtension, componentKind.AsString()) +} + +func TestSetEquality(t *testing.T) { + // The sets are created independently but should be exactly equivalent. + // We will ensure that corresponding elements are equal and that + // non-corresponding elements are not equal. + setI, setJ := createExampleSets(), createExampleSets() + for i, ei := range setI { + for j, ej := range setJ { + if i == j { + require.Equal(t, ei.ID(), ej.ID()) + require.True(t, ei.Attributes().Equals(ej.Attributes())) + } else { + require.NotEqual(t, ei.ID(), ej.ID()) + require.False(t, ei.Attributes().Equals(ej.Attributes())) + } + } + } +} + +func createExampleSets() []*Attributes { + sets := []*Attributes{} + + // Receiver examples. + for _, sig := range signals { + for _, id := range cIDs { + sets = append(sets, Receiver(sig, id)) + } + } + for _, id := range cIDs { + sets = append(sets, ReceiverSingleton(id)) + } + + // Processor examples. + for _, pID := range pIDs { + for _, cID := range cIDs { + sets = append(sets, Processor(pID, cID)) + } + } + + // Exporter examples. + for _, sig := range signals { + for _, id := range cIDs { + sets = append(sets, Exporter(sig, id)) + } + } + for _, id := range cIDs { + sets = append(sets, ExporterSingleton(id)) + } + + // Connector examples. + for _, exprSig := range signals { + for _, rcvrSig := range signals { + for _, id := range cIDs { + sets = append(sets, Connector(exprSig, rcvrSig, id)) + } + } + } + for _, id := range cIDs { + sets = append(sets, ConnectorSingleton(id)) + } + + // Capabilities examples. + for _, pID := range pIDs { + sets = append(sets, Capabilities(pID)) + } + + // Fanout examples. + for _, pID := range pIDs { + sets = append(sets, Fanout(pID)) + } + + return sets +} diff --git a/service/internal/builders/connector.go b/service/internal/builders/connector.go index b157d2dd64a1..3bc8f36f7627 100644 --- a/service/internal/builders/connector.go +++ b/service/internal/builders/connector.go @@ -376,7 +376,7 @@ func (b *ConnectorBuilder) IsConfigured(componentID component.ID) bool { return ok } -func (b *ConnectorBuilder) Factory(componentType component.Type) component.Factory { +func (b *ConnectorBuilder) Factory(componentType component.Type) connector.Factory { return b.factories[componentType] } diff --git a/service/internal/builders/exporter.go b/service/internal/builders/exporter.go index 6570828b76fe..5b881cc9a58e 100644 --- a/service/internal/builders/exporter.go +++ b/service/internal/builders/exporter.go @@ -94,7 +94,7 @@ func (b *ExporterBuilder) CreateProfiles(ctx context.Context, set exporter.Setti return f.CreateProfiles(ctx, set, cfg) } -func (b *ExporterBuilder) Factory(componentType component.Type) component.Factory { +func (b *ExporterBuilder) Factory(componentType component.Type) exporter.Factory { return b.factories[componentType] } diff --git a/service/internal/builders/processor.go b/service/internal/builders/processor.go index c0df0f3b5751..2394a815c0e3 100644 --- a/service/internal/builders/processor.go +++ b/service/internal/builders/processor.go @@ -108,7 +108,7 @@ func (b *ProcessorBuilder) CreateProfiles(ctx context.Context, set processor.Set return f.CreateProfiles(ctx, set, cfg, next) } -func (b *ProcessorBuilder) Factory(componentType component.Type) component.Factory { +func (b *ProcessorBuilder) Factory(componentType component.Type) processor.Factory { return b.factories[componentType] } diff --git a/service/internal/builders/receiver.go b/service/internal/builders/receiver.go index 007d9be21870..bf715fd32503 100644 --- a/service/internal/builders/receiver.go +++ b/service/internal/builders/receiver.go @@ -110,7 +110,7 @@ func (b *ReceiverBuilder) CreateProfiles(ctx context.Context, set receiver.Setti return f.CreateProfiles(ctx, set, cfg, next) } -func (b *ReceiverBuilder) Factory(componentType component.Type) component.Factory { +func (b *ReceiverBuilder) Factory(componentType component.Type) receiver.Factory { return b.factories[componentType] } diff --git a/service/internal/components/loggers.go b/service/internal/components/loggers.go deleted file mode 100644 index f02d19fb0829..000000000000 --- a/service/internal/components/loggers.go +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package components // import "go.opentelemetry.io/collector/service/internal/components" - -import ( - "strings" - - "go.uber.org/zap" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/pipeline" -) - -const ( - zapKindKey = "kind" - zapNameKey = "name" - zapDataTypeKey = "data_type" - zapStabilityKey = "stability" - zapPipelineKey = "pipeline" - zapExporterInPipeline = "exporter_in_pipeline" - zapReceiverInPipeline = "receiver_in_pipeline" -) - -func ReceiverLogger(logger *zap.Logger, id component.ID, dt pipeline.Signal) *zap.Logger { - return logger.With( - zap.String(zapKindKey, strings.ToLower(component.KindReceiver.String())), - zap.String(zapNameKey, id.String()), - zap.String(zapDataTypeKey, dt.String())) -} - -func ProcessorLogger(logger *zap.Logger, id component.ID, pipelineID pipeline.ID) *zap.Logger { - return logger.With( - zap.String(zapKindKey, strings.ToLower(component.KindProcessor.String())), - zap.String(zapNameKey, id.String()), - zap.String(zapPipelineKey, pipelineID.String())) -} - -func ExporterLogger(logger *zap.Logger, id component.ID, dt pipeline.Signal) *zap.Logger { - return logger.With( - zap.String(zapKindKey, strings.ToLower(component.KindExporter.String())), - zap.String(zapDataTypeKey, dt.String()), - zap.String(zapNameKey, id.String())) -} - -func ExtensionLogger(logger *zap.Logger, id component.ID) *zap.Logger { - return logger.With( - zap.String(zapKindKey, strings.ToLower(component.KindExtension.String())), - zap.String(zapNameKey, id.String())) -} - -func ConnectorLogger(logger *zap.Logger, id component.ID, expDT, rcvDT pipeline.Signal) *zap.Logger { - return logger.With( - zap.String(zapKindKey, strings.ToLower(component.KindConnector.String())), - zap.String(zapNameKey, id.String()), - zap.String(zapExporterInPipeline, expDT.String()), - zap.String(zapReceiverInPipeline, rcvDT.String())) -} diff --git a/service/internal/components/package_test.go b/service/internal/components/package_test.go deleted file mode 100644 index 30b5a82311ae..000000000000 --- a/service/internal/components/package_test.go +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package components - -import ( - "testing" - - "go.uber.org/goleak" -) - -func TestMain(m *testing.M) { - goleak.VerifyTestMain(m) -} diff --git a/service/internal/graph/attribute/attribute_test.go b/service/internal/graph/attribute/attribute_test.go deleted file mode 100644 index db2b32dc1970..000000000000 --- a/service/internal/graph/attribute/attribute_test.go +++ /dev/null @@ -1,104 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package attribute - -import ( - "testing" - - "github.com/stretchr/testify/require" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/pipeline" - "go.opentelemetry.io/collector/pipeline/pipelineprofiles" -) - -var ( - signals = []pipeline.Signal{ - pipeline.SignalTraces, - pipeline.SignalMetrics, - pipeline.SignalLogs, - pipelineprofiles.SignalProfiles, - } - - cIDs = []component.ID{ - component.MustNewID("foo"), - component.MustNewID("foo2"), - component.MustNewID("bar"), - } - - pIDs = []pipeline.ID{ - pipeline.MustNewID("traces"), - pipeline.MustNewIDWithName("traces", "2"), - pipeline.MustNewID("metrics"), - pipeline.MustNewIDWithName("metrics", "2"), - pipeline.MustNewID("logs"), - pipeline.MustNewIDWithName("logs", "2"), - pipeline.MustNewID("profiles"), - pipeline.MustNewIDWithName("profiles", "2"), - } -) - -func TestAttributes(t *testing.T) { - // The sets are created independently but should be exactly equivalent. - // We will ensure that corresponding elements are equal and that - // non-corresponding elements are not equal. - setI, setJ := createExampleSets(), createExampleSets() - for i, ei := range setI { - for j, ej := range setJ { - if i == j { - require.Equal(t, ei.ID(), ej.ID()) - require.True(t, ei.Attributes().Equals(ej.Attributes())) - } else { - require.NotEqual(t, ei.ID(), ej.ID()) - require.False(t, ei.Attributes().Equals(ej.Attributes())) - } - } - } -} - -func createExampleSets() []*Attributes { - sets := []*Attributes{} - - // Receiver examples. - for _, sig := range signals { - for _, id := range cIDs { - sets = append(sets, Receiver(sig, id)) - } - } - - // Processor examples. - for _, pID := range pIDs { - for _, cID := range cIDs { - sets = append(sets, Processor(pID, cID)) - } - } - - // Exporter examples. - for _, sig := range signals { - for _, id := range cIDs { - sets = append(sets, Exporter(sig, id)) - } - } - - // Connector examples. - for _, exprSig := range signals { - for _, rcvrSig := range signals { - for _, id := range cIDs { - sets = append(sets, Connector(exprSig, rcvrSig, id)) - } - } - } - - // Capabilities examples. - for _, pID := range pIDs { - sets = append(sets, Capabilities(pID)) - } - - // Fanout examples. - for _, pID := range pIDs { - sets = append(sets, Fanout(pID)) - } - - return sets -} diff --git a/service/internal/graph/capabilities.go b/service/internal/graph/capabilities.go index ec8e2a576c05..fa9ed5ea1424 100644 --- a/service/internal/graph/capabilities.go +++ b/service/internal/graph/capabilities.go @@ -7,7 +7,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/pipeline" - "go.opentelemetry.io/collector/service/internal/graph/attribute" + "go.opentelemetry.io/collector/service/internal/attribute" ) var _ consumerNode = (*capabilitiesNode)(nil) diff --git a/service/internal/graph/connector.go b/service/internal/graph/connector.go index df780a20b4b3..185598a619cf 100644 --- a/service/internal/graph/connector.go +++ b/service/internal/graph/connector.go @@ -13,10 +13,9 @@ import ( "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/pipeline/xpipeline" + "go.opentelemetry.io/collector/service/internal/attribute" "go.opentelemetry.io/collector/service/internal/builders" "go.opentelemetry.io/collector/service/internal/capabilityconsumer" - "go.opentelemetry.io/collector/service/internal/components" - "go.opentelemetry.io/collector/service/internal/graph/attribute" ) var _ consumerNode = (*connectorNode)(nil) @@ -31,9 +30,15 @@ type connectorNode struct { component.Component } -func newConnectorNode(exprPipelineType, rcvrPipelineType pipeline.Signal, connID component.ID) *connectorNode { +func newConnectorNode(exprPipelineType, rcvrPipelineType pipeline.Signal, connID component.ID, metadata connector.Metadata) *connectorNode { + var attrs *attribute.Attributes + if metadata.SingletonInstance { + attrs = attribute.ConnectorSingleton(connID) + } else { + attrs = attribute.Connector(exprPipelineType, rcvrPipelineType, connID) + } return &connectorNode{ - Attributes: attribute.Connector(exprPipelineType, rcvrPipelineType, connID), + Attributes: attrs, componentID: connID, exprPipelineType: exprPipelineType, rcvrPipelineType: rcvrPipelineType, @@ -51,7 +56,7 @@ func (n *connectorNode) buildComponent( builder *builders.ConnectorBuilder, nexts []baseConsumer, ) error { - tel.Logger = components.ConnectorLogger(tel.Logger, n.componentID, n.exprPipelineType, n.rcvrPipelineType) + tel.Logger = n.Attributes.Logger(tel.Logger) set := connector.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} switch n.rcvrPipelineType { case pipeline.SignalTraces: diff --git a/service/internal/graph/exporter.go b/service/internal/graph/exporter.go index 5598121c6cc1..373f18e760ab 100644 --- a/service/internal/graph/exporter.go +++ b/service/internal/graph/exporter.go @@ -11,9 +11,8 @@ import ( "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/pipeline/xpipeline" + "go.opentelemetry.io/collector/service/internal/attribute" "go.opentelemetry.io/collector/service/internal/builders" - "go.opentelemetry.io/collector/service/internal/components" - "go.opentelemetry.io/collector/service/internal/graph/attribute" ) var _ consumerNode = (*exporterNode)(nil) @@ -27,9 +26,15 @@ type exporterNode struct { component.Component } -func newExporterNode(pipelineType pipeline.Signal, exprID component.ID) *exporterNode { +func newExporterNode(pipelineType pipeline.Signal, exprID component.ID, metadata exporter.Metadata) *exporterNode { + var attrs *attribute.Attributes + if metadata.SingletonInstance { + attrs = attribute.ExporterSingleton(exprID) + } else { + attrs = attribute.Exporter(pipelineType, exprID) + } return &exporterNode{ - Attributes: attribute.Exporter(pipelineType, exprID), + Attributes: attrs, componentID: exprID, pipelineType: pipelineType, } @@ -45,7 +50,7 @@ func (n *exporterNode) buildComponent( info component.BuildInfo, builder *builders.ExporterBuilder, ) error { - tel.Logger = components.ExporterLogger(tel.Logger, n.componentID, n.pipelineType) + tel.Logger = n.Attributes.Logger(tel.Logger) set := exporter.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} var err error switch n.pipelineType { diff --git a/service/internal/graph/fanout.go b/service/internal/graph/fanout.go index 7d1a76f086f4..40aa7c01d296 100644 --- a/service/internal/graph/fanout.go +++ b/service/internal/graph/fanout.go @@ -5,7 +5,7 @@ package graph // import "go.opentelemetry.io/collector/service/internal/graph" import ( "go.opentelemetry.io/collector/pipeline" - "go.opentelemetry.io/collector/service/internal/graph/attribute" + "go.opentelemetry.io/collector/service/internal/attribute" ) var _ consumerNode = (*fanOutNode)(nil) diff --git a/service/internal/graph/graph.go b/service/internal/graph/graph.go index 2bc4e163d7e1..68c6d5fb9117 100644 --- a/service/internal/graph/graph.go +++ b/service/internal/graph/graph.go @@ -30,9 +30,11 @@ import ( "go.opentelemetry.io/collector/connector/xconnector" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/xconsumer" + "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/internal/fanoutconsumer" "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/pipeline/xpipeline" + "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/service/internal/builders" "go.opentelemetry.io/collector/service/internal/capabilityconsumer" "go.opentelemetry.io/collector/service/internal/status" @@ -111,13 +113,22 @@ func (g *Graph) createNodes(set Settings) error { connectorsAsReceiver[recvID] = append(connectorsAsReceiver[recvID], pipelineID) continue } - rcvrNode := g.createReceiver(pipelineID, recvID) + factory := set.ReceiverBuilder.Factory(recvID.Type()) + if factory == nil { + return fmt.Errorf("receiver factory not available for: %q", recvID.Type()) + } + rcvrNode := g.createReceiver(pipelineID, recvID, set.ReceiverBuilder.Factory(recvID.Type()).Metadata()) pipe.receivers[rcvrNode.ID()] = rcvrNode } pipe.capabilitiesNode = newCapabilitiesNode(pipelineID) for _, procID := range pipelineCfg.Processors { + factory := set.ProcessorBuilder.Factory(procID.Type()) + if factory == nil { + return fmt.Errorf("processor factory not available for: %q", procID.Type()) + } + procNode := g.createProcessor(pipelineID, procID) pipe.processors = append(pipe.processors, procNode) } @@ -130,7 +141,11 @@ func (g *Graph) createNodes(set Settings) error { connectorsAsExporter[exprID] = append(connectorsAsExporter[exprID], pipelineID) continue } - expNode := g.createExporter(pipelineID, exprID) + factory := set.ExporterBuilder.Factory(exprID.Type()) + if factory == nil { + return fmt.Errorf("exporter factory not available for: %q", exprID.Type()) + } + expNode := g.createExporter(pipelineID, exprID, factory.Metadata()) pipe.exporters[expNode.ID()] = expNode } } @@ -140,7 +155,6 @@ func (g *Graph) createNodes(set Settings) error { if factory == nil { return fmt.Errorf("connector factory not available for: %q", connID.Type()) } - connFactory := factory.(connector.Factory) expTypes := make(map[pipeline.Signal]bool) for _, pipelineID := range connectorsAsExporter[connID] { @@ -160,7 +174,7 @@ func (g *Graph) createNodes(set Settings) error { for expType := range expTypes { for recType := range recTypes { // Typechecks the connector's receiving and exporting datatypes. - if connectorStability(connFactory, expType, recType) != component.StabilityLevelUndefined { + if connectorStability(factory, expType, recType) != component.StabilityLevelUndefined { expTypes[expType] = true recTypes[recType] = true } @@ -182,11 +196,11 @@ func (g *Graph) createNodes(set Settings) error { for _, eID := range connectorsAsExporter[connID] { for _, rID := range connectorsAsReceiver[connID] { - if connectorStability(connFactory, eID.Signal(), rID.Signal()) == component.StabilityLevelUndefined { + if connectorStability(factory, eID.Signal(), rID.Signal()) == component.StabilityLevelUndefined { // Connector is not supported for this combination, but we know it is used correctly elsewhere continue } - connNode := g.createConnector(eID, rID, connID) + connNode := g.createConnector(eID, rID, connID, factory.Metadata()) g.pipelines[eID].exporters[connNode.ID()] = connNode g.pipelines[rID].receivers[connNode.ID()] = connNode @@ -196,8 +210,8 @@ func (g *Graph) createNodes(set Settings) error { return nil } -func (g *Graph) createReceiver(pipelineID pipeline.ID, recvID component.ID) *receiverNode { - rcvrNode := newReceiverNode(pipelineID.Signal(), recvID) +func (g *Graph) createReceiver(pipelineID pipeline.ID, recvID component.ID, metadata receiver.Metadata) *receiverNode { + rcvrNode := newReceiverNode(pipelineID.Signal(), recvID, metadata) if node := g.componentGraph.Node(rcvrNode.ID()); node != nil { instanceID := g.instanceIDs[node.ID()] g.instanceIDs[node.ID()] = instanceID.WithPipelines(pipelineID) @@ -219,8 +233,8 @@ func (g *Graph) createProcessor(pipelineID pipeline.ID, procID component.ID) *pr return procNode } -func (g *Graph) createExporter(pipelineID pipeline.ID, exprID component.ID) *exporterNode { - expNode := newExporterNode(pipelineID.Signal(), exprID) +func (g *Graph) createExporter(pipelineID pipeline.ID, exprID component.ID, metadata exporter.Metadata) *exporterNode { + expNode := newExporterNode(pipelineID.Signal(), exprID, metadata) if node := g.componentGraph.Node(expNode.ID()); node != nil { instanceID := g.instanceIDs[expNode.ID()] g.instanceIDs[expNode.ID()] = instanceID.WithPipelines(pipelineID) @@ -233,8 +247,8 @@ func (g *Graph) createExporter(pipelineID pipeline.ID, exprID component.ID) *exp return expNode } -func (g *Graph) createConnector(exprPipelineID, rcvrPipelineID pipeline.ID, connID component.ID) *connectorNode { - connNode := newConnectorNode(exprPipelineID.Signal(), rcvrPipelineID.Signal(), connID) +func (g *Graph) createConnector(exprPipelineID, rcvrPipelineID pipeline.ID, connID component.ID, metadata connector.Metadata) *connectorNode { + connNode := newConnectorNode(exprPipelineID.Signal(), rcvrPipelineID.Signal(), connID, metadata) if node := g.componentGraph.Node(connNode.ID()); node != nil { instanceID := g.instanceIDs[connNode.ID()] g.instanceIDs[connNode.ID()] = instanceID.WithPipelines(exprPipelineID, rcvrPipelineID) diff --git a/service/internal/graph/graph_test.go b/service/internal/graph/graph_test.go index a1025e14e330..a57cf3dd983f 100644 --- a/service/internal/graph/graph_test.go +++ b/service/internal/graph/graph_test.go @@ -2157,11 +2157,11 @@ func TestGraphBuildErrors(t *testing.T) { }, }, expected: `cycle detected: ` + - `connector "nop/conn" (logs to logs) -> ` + - `processor "nop" in pipeline "logs/1" -> ` + `connector "nop/conn1" (logs to logs) -> ` + `processor "nop" in pipeline "logs/2" -> ` + - `connector "nop/conn" (logs to logs)`, + `connector "nop/conn" (logs to logs) -> ` + + `processor "nop" in pipeline "logs/1" -> ` + + `connector "nop/conn1" (logs to logs)`, }, { name: "not_allowed_deep_cycle_profiles.yaml", @@ -2263,13 +2263,13 @@ func TestGraphBuildErrors(t *testing.T) { }, }, expected: `cycle detected: ` + - `connector "nop/forkagain" (traces to traces) -> ` + - `processor "nop" in pipeline "traces/copy2b" -> ` + `connector "nop/rawlog" (traces to logs) -> ` + `processor "nop" in pipeline "logs/raw" -> ` + `connector "nop/fork" (logs to traces) -> ` + `processor "nop" in pipeline "traces/copy2" -> ` + - `connector "nop/forkagain" (traces to traces)`, + `connector "nop/forkagain" (traces to traces) -> ` + + `processor "nop" in pipeline "traces/copy2b" -> ` + + `connector "nop/rawlog" (traces to logs)`, }, { name: "unknown_exporter_config", @@ -2301,7 +2301,7 @@ func TestGraphBuildErrors(t *testing.T) { Exporters: []component.ID{component.MustNewID("unknown")}, }, }, - expected: "failed to create \"unknown\" exporter for data type \"traces\": exporter factory not available for: \"unknown\"", + expected: "exporter factory not available for: \"unknown\"", }, { name: "unknown_processor_config", @@ -2341,7 +2341,7 @@ func TestGraphBuildErrors(t *testing.T) { Exporters: []component.ID{component.MustNewID("nop")}, }, }, - expected: "failed to create \"unknown\" processor, in pipeline \"metrics\": processor factory not available for: \"unknown\"", + expected: "processor factory not available for: \"unknown\"", }, { name: "unknown_receiver_config", @@ -2373,7 +2373,7 @@ func TestGraphBuildErrors(t *testing.T) { Exporters: []component.ID{component.MustNewID("nop")}, }, }, - expected: "failed to create \"unknown\" receiver for data type \"logs\": receiver factory not available for: \"unknown\"", + expected: "receiver factory not available for: \"unknown\"", }, { name: "unknown_connector_factory", diff --git a/service/internal/graph/processor.go b/service/internal/graph/processor.go index cfe36cb461c1..77af19e8d6cc 100644 --- a/service/internal/graph/processor.go +++ b/service/internal/graph/processor.go @@ -13,9 +13,8 @@ import ( "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/pipeline/xpipeline" "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/service/internal/attribute" "go.opentelemetry.io/collector/service/internal/builders" - "go.opentelemetry.io/collector/service/internal/components" - "go.opentelemetry.io/collector/service/internal/graph/attribute" ) var _ consumerNode = (*processorNode)(nil) @@ -47,7 +46,7 @@ func (n *processorNode) buildComponent(ctx context.Context, builder *builders.ProcessorBuilder, next baseConsumer, ) error { - tel.Logger = components.ProcessorLogger(tel.Logger, n.componentID, n.pipelineID) + tel.Logger = n.Attributes.Logger(tel.Logger) set := processor.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} var err error switch n.pipelineID.Signal() { diff --git a/service/internal/graph/receiver.go b/service/internal/graph/receiver.go index 57dfcded1170..f6ffd6c7fadb 100644 --- a/service/internal/graph/receiver.go +++ b/service/internal/graph/receiver.go @@ -14,9 +14,8 @@ import ( "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/pipeline/xpipeline" "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/service/internal/attribute" "go.opentelemetry.io/collector/service/internal/builders" - "go.opentelemetry.io/collector/service/internal/components" - "go.opentelemetry.io/collector/service/internal/graph/attribute" ) // A receiver instance can be shared by multiple pipelines of the same type. @@ -28,9 +27,15 @@ type receiverNode struct { component.Component } -func newReceiverNode(pipelineType pipeline.Signal, recvID component.ID) *receiverNode { +func newReceiverNode(pipelineType pipeline.Signal, recvID component.ID, metadata receiver.Metadata) *receiverNode { + var attrs *attribute.Attributes + if metadata.SingletonInstance { + attrs = attribute.ReceiverSingleton(recvID) + } else { + attrs = attribute.Receiver(pipelineType, recvID) + } return &receiverNode{ - Attributes: attribute.Receiver(pipelineType, recvID), + Attributes: attrs, componentID: recvID, pipelineType: pipelineType, } @@ -42,7 +47,7 @@ func (n *receiverNode) buildComponent(ctx context.Context, builder *builders.ReceiverBuilder, nexts []baseConsumer, ) error { - tel.Logger = components.ReceiverLogger(tel.Logger, n.componentID, n.pipelineType) + tel.Logger = n.Attributes.Logger(tel.Logger) set := receiver.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} var err error switch n.pipelineType {