Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add singleton flags to factories and standardize attributes #12057

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,17 @@ type Factory interface {
// LogsStability gets the stability level of the Logs exporter.
LogsStability() component.StabilityLevel

// Metadata returns the metadata describing the receiver.
Metadata() Metadata

unexportedFactoryFunc()
}

// Metadata contains metadata describing the component that is created by the factory.
type Metadata struct {
SingletonInstance bool
}

// FactoryOption apply changes to Factory.
type FactoryOption interface {
// applyOption applies the option.
Expand Down Expand Up @@ -132,6 +140,7 @@ type factory struct {
metricsStabilityLevel component.StabilityLevel
CreateLogsFunc
logsStabilityLevel component.StabilityLevel
metadata Metadata
}

func (f *factory) Type() component.Type {
Expand All @@ -152,6 +161,10 @@ func (f *factory) LogsStability() component.StabilityLevel {
return f.logsStabilityLevel
}

func (f *factory) Metadata() Metadata {
return f.metadata
}

// WithTraces overrides the default "error not supported" implementation for Factory.CreateTraces and the default "undefined" stability level.
func WithTraces(createTraces CreateTracesFunc, sl component.StabilityLevel) FactoryOption {
return factoryOptionFunc(func(o *factory) {
Expand All @@ -176,6 +189,13 @@ func WithLogs(createLogs CreateLogsFunc, sl component.StabilityLevel) FactoryOpt
})
}

// AsSingletonInstance indicates that the factory always returns the same instance of the component.
func AsSingletonInstance() FactoryOption {
return factoryOptionFunc(func(o *factory) {
o.metadata.SingletonInstance = true
})
}

// NewFactory returns a Factory.
func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory {
f := &factory{
Expand Down
8 changes: 6 additions & 2 deletions exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestNewFactory(t *testing.T) {
require.ErrorIs(t, err, pipeline.ErrSignalNotSupported)
_, err = f.CreateLogs(context.Background(), Settings{}, &defaultCfg)
require.ErrorIs(t, err, pipeline.ErrSignalNotSupported)
assert.False(t, f.Metadata().SingletonInstance)
}

func TestNewFactoryWithOptions(t *testing.T) {
Expand All @@ -39,7 +40,8 @@ func TestNewFactoryWithOptions(t *testing.T) {
func() component.Config { return &defaultCfg },
WithTraces(createTraces, component.StabilityLevelDevelopment),
WithMetrics(createMetrics, component.StabilityLevelAlpha),
WithLogs(createLogs, component.StabilityLevelDeprecated))
WithLogs(createLogs, component.StabilityLevelDeprecated),
AsSingletonInstance())
assert.EqualValues(t, testType, f.Type())
assert.EqualValues(t, &defaultCfg, f.CreateDefaultConfig())

Expand All @@ -53,7 +55,9 @@ func TestNewFactoryWithOptions(t *testing.T) {

assert.Equal(t, component.StabilityLevelDeprecated, f.LogsStability())
_, err = f.CreateLogs(context.Background(), Settings{}, &defaultCfg)
assert.NoError(t, err)
require.NoError(t, err)

assert.True(t, f.Metadata().SingletonInstance)
}

func TestMakeFactoryMap(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions exporter/xexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ func WithProfiles(createProfiles CreateProfilesFunc, sl component.StabilityLevel
})
}

// AsSingletonInstance sets the exporter as a singleton instance.
func AsSingletonInstance() FactoryOption {
return factoryOptionFunc(func(o *factoryOpts) {
o.opts = append(o.opts, exporter.AsSingletonInstance())
})
}

type factory struct {
exporter.Factory
CreateProfilesFunc
Expand Down
1 change: 1 addition & 0 deletions exporter/xexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func TestNewFactoryWithProfiles(t *testing.T) {
testType,
func() component.Config { return &defaultCfg },
WithProfiles(createProfiles, component.StabilityLevelDevelopment),
AsSingletonInstance(),
)
assert.EqualValues(t, testType, factory.Type())
assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig())
Expand Down
1 change: 1 addition & 0 deletions receiver/otlpreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func NewFactory() receiver.Factory {
xreceiver.WithMetrics(createMetrics, metadata.MetricsStability),
xreceiver.WithLogs(createLog, metadata.LogsStability),
xreceiver.WithProfiles(createProfiles, metadata.ProfilesStability),
xreceiver.AsSingletonInstance(),
)
}

Expand Down
20 changes: 20 additions & 0 deletions receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,17 @@ type Factory interface {
// LogsStability gets the stability level of the Logs receiver.
LogsStability() component.StabilityLevel

// Metadata returns the metadata describing the receiver.
Metadata() Metadata

unexportedFactoryFunc()
}

// Metadata contains metadata describing the component that is created by the factory.
type Metadata struct {
SingletonInstance bool
}

// FactoryOption apply changes to Factory.
type FactoryOption interface {
// applyOption applies the option.
Expand Down Expand Up @@ -142,6 +150,7 @@ type factory struct {
metricsStabilityLevel component.StabilityLevel
CreateLogsFunc
logsStabilityLevel component.StabilityLevel
metadata Metadata
}

func (f *factory) Type() component.Type {
Expand All @@ -162,6 +171,10 @@ func (f *factory) LogsStability() component.StabilityLevel {
return f.logsStabilityLevel
}

func (f *factory) Metadata() Metadata {
return f.metadata
}

// WithTraces overrides the default "error not supported" implementation for Factory.CreateTraces and the default "undefined" stability level.
func WithTraces(createTraces CreateTracesFunc, sl component.StabilityLevel) FactoryOption {
return factoryOptionFunc(func(o *factory) {
Expand All @@ -186,6 +199,13 @@ func WithLogs(createLogs CreateLogsFunc, sl component.StabilityLevel) FactoryOpt
})
}

// AsSingletonInstance indicates that the factory always returns the same instance of the component.
func AsSingletonInstance() FactoryOption {
return factoryOptionFunc(func(o *factory) {
o.metadata.SingletonInstance = true
})
}

// NewFactory returns a Factory.
func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory {
f := &factory{
Expand Down
8 changes: 6 additions & 2 deletions receiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func TestNewFactory(t *testing.T) {
require.ErrorIs(t, err, pipeline.ErrSignalNotSupported)
_, err = f.CreateLogs(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop())
require.ErrorIs(t, err, pipeline.ErrSignalNotSupported)
assert.False(t, f.Metadata().SingletonInstance)
}

func TestNewFactoryWithOptions(t *testing.T) {
Expand All @@ -40,7 +41,8 @@ func TestNewFactoryWithOptions(t *testing.T) {
func() component.Config { return &defaultCfg },
WithTraces(createTraces, component.StabilityLevelDeprecated),
WithMetrics(createMetrics, component.StabilityLevelAlpha),
WithLogs(createLogs, component.StabilityLevelStable))
WithLogs(createLogs, component.StabilityLevelStable),
AsSingletonInstance())
assert.EqualValues(t, testType, f.Type())
assert.EqualValues(t, &defaultCfg, f.CreateDefaultConfig())

Expand All @@ -54,7 +56,9 @@ func TestNewFactoryWithOptions(t *testing.T) {

assert.Equal(t, component.StabilityLevelStable, f.LogsStability())
_, err = f.CreateLogs(context.Background(), Settings{}, &defaultCfg, nil)
assert.NoError(t, err)
require.NoError(t, err)

assert.True(t, f.Metadata().SingletonInstance)
}

func TestMakeFactoryMap(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions receiver/xreceiver/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ func WithProfiles(createProfiles CreateProfilesFunc, sl component.StabilityLevel
})
}

// AsSingletonInstance sets the receiver as a singleton instance.
func AsSingletonInstance() FactoryOption {
return factoryOptionFunc(func(o *factoryOpts) {
o.opts = append(o.opts, receiver.AsSingletonInstance())
})
}

// NewFactory returns a Factory.
func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory {
opts := factoryOpts{factory: &factory{}}
Expand Down
1 change: 1 addition & 0 deletions receiver/xreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func TestNewFactoryWithProfiles(t *testing.T) {
testType,
func() component.Config { return &defaultCfg },
WithProfiles(createProfiles, component.StabilityLevelAlpha),
AsSingletonInstance(),
)
assert.EqualValues(t, testType, factory.Type())
assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig())
Expand Down
6 changes: 3 additions & 3 deletions service/extensions/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/extension/extensioncapabilities"
"go.opentelemetry.io/collector/service/internal/attribute"
"go.opentelemetry.io/collector/service/internal/builders"
"go.opentelemetry.io/collector/service/internal/components"
"go.opentelemetry.io/collector/service/internal/status"
"go.opentelemetry.io/collector/service/internal/zpages"
)
Expand All @@ -38,7 +38,7 @@ type Extensions struct {
func (bes *Extensions) Start(ctx context.Context, host component.Host) error {
bes.telemetry.Logger.Info("Starting extensions...")
for _, extID := range bes.extensionIDs {
extLogger := components.ExtensionLogger(bes.telemetry.Logger, extID)
extLogger := attribute.Extension(extID).Logger(bes.telemetry.Logger)
extLogger.Info("Extension is starting...")
instanceID := bes.instanceIDs[extID]
ext := bes.extMap[extID]
Expand Down Expand Up @@ -216,7 +216,7 @@ func New(ctx context.Context, set Settings, cfg Config, options ...Option) (*Ext
BuildInfo: set.BuildInfo,
ModuleInfo: set.ModuleInfo,
}
extSet.TelemetrySettings.Logger = components.ExtensionLogger(set.Telemetry.Logger, extID)
extSet.TelemetrySettings.Logger = attribute.Extension(extID).Logger(set.Telemetry.Logger)

ext, err := set.Extensions.Create(ctx, extSet)
if err != nil {
Expand Down
126 changes: 126 additions & 0 deletions service/internal/attribute/attribute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package attribute // import "go.opentelemetry.io/collector/service/internal/attribute"

import (
"hash/fnv"

"go.opentelemetry.io/otel/attribute"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pipeline"
)

const (
componentKindKey = "otelcol.component.kind"
componentIDKey = "otelcol.component.id"
pipelineIDKey = "otelcol.pipeline.id"
signalKey = "otelcol.signal"
signalOutputKey = "otelcol.signal.output"

capabiltiesKind = "capabilities"
fanoutKind = "fanout"
)

type Attributes struct {
set attribute.Set
id int64
}

func newAttributes(attrs ...attribute.KeyValue) *Attributes {
h := fnv.New64a()
for _, kv := range attrs {
h.Write([]byte("(" + string(kv.Key) + "|" + kv.Value.AsString() + ")"))
}
return &Attributes{
set: attribute.NewSet(attrs...),
id: int64(h.Sum64()), // #nosec G115
}
}

func (a Attributes) Attributes() *attribute.Set {
return &a.set
}

func (a Attributes) ID() int64 {
return a.id
}

func (a Attributes) Logger(logger *zap.Logger) *zap.Logger {
fields := make([]zap.Field, 0, a.set.Len())
for _, kv := range a.set.ToSlice() {
fields = append(fields, zap.String(string(kv.Key), kv.Value.AsString()))
}
return logger.With(fields...)
}

func Receiver(pipelineType pipeline.Signal, id component.ID) *Attributes {
return newAttributes(
attribute.String(componentKindKey, component.KindReceiver.String()),
attribute.String(signalKey, pipelineType.String()),
attribute.String(componentIDKey, id.String()),
)
}

func ReceiverSingleton(id component.ID) *Attributes {
return newAttributes(
attribute.String(componentKindKey, component.KindReceiver.String()),
attribute.String(componentIDKey, id.String()),
)
}

func Processor(pipelineID pipeline.ID, id component.ID) *Attributes {
return newAttributes(
attribute.String(componentKindKey, component.KindProcessor.String()),
attribute.String(signalKey, pipelineID.Signal().String()),
attribute.String(pipelineIDKey, pipelineID.String()),
attribute.String(componentIDKey, id.String()),
)
}

func Exporter(pipelineType pipeline.Signal, id component.ID) *Attributes {
return newAttributes(
attribute.String(componentKindKey, component.KindExporter.String()),
attribute.String(signalKey, pipelineType.String()),
attribute.String(componentIDKey, id.String()),
)
}

func ExporterSingleton(id component.ID) *Attributes {
return newAttributes(
attribute.String(componentKindKey, component.KindExporter.String()),
attribute.String(componentIDKey, id.String()),
)
}

func Connector(exprPipelineType, rcvrPipelineType pipeline.Signal, id component.ID) *Attributes {
return newAttributes(
attribute.String(componentKindKey, component.KindConnector.String()),
attribute.String(signalKey, exprPipelineType.String()),
attribute.String(signalOutputKey, rcvrPipelineType.String()),
attribute.String(componentIDKey, id.String()),
)
}

func Capabilities(pipelineID pipeline.ID) *Attributes {
return newAttributes(
attribute.String(componentKindKey, capabiltiesKind),
attribute.String(pipelineIDKey, pipelineID.String()),
)
}

func Fanout(pipelineID pipeline.ID) *Attributes {
return newAttributes(
attribute.String(componentKindKey, fanoutKind),
attribute.String(pipelineIDKey, pipelineID.String()),
)
}

func Extension(id component.ID) *Attributes {
return newAttributes(
attribute.String(componentKindKey, component.KindExtension.String()),
attribute.String(componentIDKey, id.String()),
)
}
Loading
Loading