diff --git a/internal/sharedcomponent/sharedcomponent.go b/internal/sharedcomponent/sharedcomponent.go index 23d7d08c79e..417137a8919 100644 --- a/internal/sharedcomponent/sharedcomponent.go +++ b/internal/sharedcomponent/sharedcomponent.go @@ -1,8 +1,9 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -// Package sharedcomponent exposes util functionality for receivers and exporters -// that need to share state between different signal types instances such as net.Listener or os.File. +// Package sharedcomponent exposes functionality for components +// to register against a shared key, such as a configuration object, in order to be reused across signal types. +// This is particularly useful when the component relies on a shared resource such as os.File or http.Server. package sharedcomponent // import "go.opentelemetry.io/collector/internal/sharedcomponent" import ( @@ -12,23 +13,21 @@ import ( "go.opentelemetry.io/collector/component" ) -// SharedComponents a map that keeps reference of all created instances for a given configuration, -// and ensures that the shared state is started and stopped only once. -type SharedComponents[K comparable, V component.Component] struct { - comps map[K]*SharedComponent[V] +func NewMap[K comparable, V component.Component]() *Map[K, V] { + return &Map[K, V]{ + components: map[K]*Component[V]{}, + } } -// NewSharedComponents returns a new empty SharedComponents. -func NewSharedComponents[K comparable, V component.Component]() *SharedComponents[K, V] { - return &SharedComponents[K, V]{ - comps: make(map[K]*SharedComponent[V]), - } +// Map keeps reference of all created instances for a given shared key such as a component configuration. +type Map[K comparable, V component.Component] struct { + components map[K]*Component[V] } -// GetOrAdd returns the already created instance if exists, otherwise creates a new instance +// LoadOrStore returns the already created instance if exists, otherwise creates a new instance // and adds it to the map of references. -func (scs *SharedComponents[K, V]) GetOrAdd(key K, create func() (V, error), telemetrySettings *component.TelemetrySettings) (*SharedComponent[V], error) { - if c, ok := scs.comps[key]; ok { +func (m *Map[K, V]) LoadOrStore(key K, create func() (V, error), telemetrySettings *component.TelemetrySettings) (*Component[V], error) { + if c, ok := m.components[key]; ok { // If we haven't already seen this telemetry settings, this shared component represents // another instance. Wrap ReportComponentStatus to report for all instances this shared // component represents. @@ -50,23 +49,23 @@ func (scs *SharedComponents[K, V]) GetOrAdd(key K, create func() (V, error), tel return nil, err } - newComp := &SharedComponent[V]{ + newComp := &Component[V]{ component: comp, removeFunc: func() { - delete(scs.comps, key) + delete(m.components, key) }, telemetry: telemetrySettings, seenSettings: map[*component.TelemetrySettings]struct{}{ telemetrySettings: {}, }, } - scs.comps[key] = newComp + m.components[key] = newComp return newComp, nil } -// SharedComponent ensures that the wrapped component is started and stopped only once. -// When stopped it is removed from the SharedComponents map. -type SharedComponent[V component.Component] struct { +// Component ensures that the wrapped component is started and stopped only once. +// When stopped it is removed from the Map. +type Component[V component.Component] struct { component V startOnce sync.Once @@ -78,42 +77,42 @@ type SharedComponent[V component.Component] struct { } // Unwrap returns the original component. -func (r *SharedComponent[V]) Unwrap() V { - return r.component +func (c *Component[V]) Unwrap() V { + return c.component } -// Start implements component.Component. -func (r *SharedComponent[V]) Start(ctx context.Context, host component.Host) error { +// Start starts the underlying component if it never started before. +func (c *Component[V]) Start(ctx context.Context, host component.Host) error { var err error - r.startOnce.Do(func() { - // It's important that status for a sharedcomponent is reported through its - // telemetrysettings to keep status in sync and avoid race conditions. This logic duplicates + c.startOnce.Do(func() { + // It's important that status for a shared component is reported through its + // telemetry settings to keep status in sync and avoid race conditions. This logic duplicates // and takes priority over the automated status reporting that happens in graph, making the // status reporting in graph a no-op. - _ = r.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStarting)) - if err = r.component.Start(ctx, host); err != nil { - _ = r.telemetry.ReportComponentStatus(component.NewPermanentErrorEvent(err)) + _ = c.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStarting)) + if err = c.component.Start(ctx, host); err != nil { + _ = c.telemetry.ReportComponentStatus(component.NewPermanentErrorEvent(err)) } }) return err } -// Shutdown implements component.Component. -func (r *SharedComponent[V]) Shutdown(ctx context.Context) error { +// Shutdown shuts down the underlying component. +func (c *Component[V]) Shutdown(ctx context.Context) error { var err error - r.stopOnce.Do(func() { - // It's important that status for a sharedcomponent is reported through its - // telemetrysettings to keep status in sync and avoid race conditions. This logic duplicates + c.stopOnce.Do(func() { + // It's important that status for a shared component is reported through its + // telemetry settings to keep status in sync and avoid race conditions. This logic duplicates // and takes priority over the automated status reporting that happens in graph, making the - // the status reporting in graph a no-op. - _ = r.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopping)) - err = r.component.Shutdown(ctx) + // status reporting in graph a no-op. + _ = c.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopping)) + err = c.component.Shutdown(ctx) if err != nil { - _ = r.telemetry.ReportComponentStatus(component.NewPermanentErrorEvent(err)) + _ = c.telemetry.ReportComponentStatus(component.NewPermanentErrorEvent(err)) } else { - _ = r.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopped)) + _ = c.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopped)) } - r.removeFunc() + c.removeFunc() }) return err } diff --git a/internal/sharedcomponent/sharedcomponent_test.go b/internal/sharedcomponent/sharedcomponent_test.go index 5ce081fa752..7254bcfcbf3 100644 --- a/internal/sharedcomponent/sharedcomponent_test.go +++ b/internal/sharedcomponent/sharedcomponent_test.go @@ -23,37 +23,37 @@ type baseComponent struct { telemetry *component.TelemetrySettings } -func TestNewSharedComponents(t *testing.T) { - comps := NewSharedComponents[component.ID, *baseComponent]() - assert.Len(t, comps.comps, 0) +func TestNewMap(t *testing.T) { + comps := NewMap[component.ID, *baseComponent]() + assert.Len(t, comps.components, 0) } func TestNewSharedComponentsCreateError(t *testing.T) { - comps := NewSharedComponents[component.ID, *baseComponent]() - assert.Len(t, comps.comps, 0) + comps := NewMap[component.ID, *baseComponent]() + assert.Len(t, comps.components, 0) myErr := errors.New("my error") - _, err := comps.GetOrAdd( + _, err := comps.LoadOrStore( id, func() (*baseComponent, error) { return nil, myErr }, newNopTelemetrySettings(), ) assert.ErrorIs(t, err, myErr) - assert.Len(t, comps.comps, 0) + assert.Len(t, comps.components, 0) } -func TestSharedComponentsGetOrAdd(t *testing.T) { +func TestSharedComponentsLoadOrStore(t *testing.T) { nop := &baseComponent{} - comps := NewSharedComponents[component.ID, *baseComponent]() - got, err := comps.GetOrAdd( + comps := NewMap[component.ID, *baseComponent]() + got, err := comps.LoadOrStore( id, func() (*baseComponent, error) { return nop, nil }, newNopTelemetrySettings(), ) require.NoError(t, err) - assert.Len(t, comps.comps, 1) + assert.Len(t, comps.components, 1) assert.Same(t, nop, got.Unwrap()) - gotSecond, err := comps.GetOrAdd( + gotSecond, err := comps.LoadOrStore( id, func() (*baseComponent, error) { panic("should not be called") }, newNopTelemetrySettings(), @@ -64,8 +64,8 @@ func TestSharedComponentsGetOrAdd(t *testing.T) { // Shutdown nop will remove assert.NoError(t, got.Shutdown(context.Background())) - assert.Len(t, comps.comps, 0) - gotThird, err := comps.GetOrAdd( + assert.Len(t, comps.components, 0) + gotThird, err := comps.LoadOrStore( id, func() (*baseComponent, error) { return nop, nil }, newNopTelemetrySettings(), @@ -88,8 +88,8 @@ func TestSharedComponent(t *testing.T) { return wantErr }} - comps := NewSharedComponents[component.ID, *baseComponent]() - got, err := comps.GetOrAdd( + comps := NewMap[component.ID, *baseComponent]() + got, err := comps.LoadOrStore( id, func() (*baseComponent, error) { return comp, nil }, newNopTelemetrySettings(), @@ -100,13 +100,13 @@ func TestSharedComponent(t *testing.T) { // Second time is not called anymore. assert.NoError(t, got.Start(context.Background(), componenttest.NewNopHost())) assert.Equal(t, 1, calledStart) + // first time, shutdown is called. assert.Equal(t, wantErr, got.Shutdown(context.Background())) assert.Equal(t, 1, calledStop) // Second time is not called anymore. assert.NoError(t, got.Shutdown(context.Background())) assert.Equal(t, 1, calledStop) } - func TestSharedComponentsReportStatus(t *testing.T) { reportedStatuses := make(map[*component.InstanceID][]component.Status) newStatusFunc := func() func(*component.StatusEvent) error { @@ -122,7 +122,7 @@ func TestSharedComponentsReportStatus(t *testing.T) { } comp := &baseComponent{} - comps := NewSharedComponents[component.ID, *baseComponent]() + comps := NewMap[component.ID, *baseComponent]() var telemetrySettings *component.TelemetrySettings // make a shared component that represents three instances @@ -130,23 +130,23 @@ func TestSharedComponentsReportStatus(t *testing.T) { telemetrySettings = newNopTelemetrySettings() telemetrySettings.ReportComponentStatus = newStatusFunc() // The initial settings for the shared component need to match the ones passed to the first - // invocation of GetOrAdd so that underlying telemetry settings reference can be used to + // invocation of LoadOrStore so that underlying telemetry settings reference can be used to // wrap ReportComponentStatus for subsequently added "instances". if i == 0 { comp.telemetry = telemetrySettings } - got, err := comps.GetOrAdd( + got, err := comps.LoadOrStore( id, func() (*baseComponent, error) { return comp, nil }, telemetrySettings, ) require.NoError(t, err) - assert.Len(t, comps.comps, 1) + assert.Len(t, comps.components, 1) assert.Same(t, comp, got.Unwrap()) } // make sure we don't try to represent a fourth instance if we reuse a telemetrySettings - _, _ = comps.GetOrAdd( + _, _ = comps.LoadOrStore( id, func() (*baseComponent, error) { return comp, nil }, telemetrySettings, @@ -245,8 +245,8 @@ func TestReportStatusOnStartShutdown(t *testing.T) { return tc.shutdownErr } } - comps := NewSharedComponents[component.ID, *baseComponent]() - var comp *SharedComponent[*baseComponent] + comps := NewMap[component.ID, *baseComponent]() + var comp *Component[*baseComponent] var err error for i := 0; i < 3; i++ { telemetrySettings := newNopTelemetrySettings() @@ -254,7 +254,7 @@ func TestReportStatusOnStartShutdown(t *testing.T) { if i == 0 { base.telemetry = telemetrySettings } - comp, err = comps.GetOrAdd( + comp, err = comps.LoadOrStore( id, func() (*baseComponent, error) { return base, nil }, telemetrySettings, diff --git a/receiver/otlpreceiver/factory.go b/receiver/otlpreceiver/factory.go index cce8b363cd4..125c7c9f296 100644 --- a/receiver/otlpreceiver/factory.go +++ b/receiver/otlpreceiver/factory.go @@ -68,7 +68,7 @@ func createTraces( nextConsumer consumer.Traces, ) (receiver.Traces, error) { oCfg := cfg.(*Config) - r, err := receivers.GetOrAdd( + r, err := receivers.LoadOrStore( oCfg, func() (*otlpReceiver, error) { return newOtlpReceiver(oCfg, &set) @@ -93,7 +93,7 @@ func createMetrics( consumer consumer.Metrics, ) (receiver.Metrics, error) { oCfg := cfg.(*Config) - r, err := receivers.GetOrAdd( + r, err := receivers.LoadOrStore( oCfg, func() (*otlpReceiver, error) { return newOtlpReceiver(oCfg, &set) @@ -118,7 +118,7 @@ func createLog( consumer consumer.Logs, ) (receiver.Logs, error) { oCfg := cfg.(*Config) - r, err := receivers.GetOrAdd( + r, err := receivers.LoadOrStore( oCfg, func() (*otlpReceiver, error) { return newOtlpReceiver(oCfg, &set) @@ -141,4 +141,4 @@ func createLog( // create separate objects, they must use one otlpReceiver object per configuration. // When the receiver is shutdown it should be removed from this map so the same configuration // can be recreated successfully. -var receivers = sharedcomponent.NewSharedComponents[*Config, *otlpReceiver]() +var receivers = sharedcomponent.NewMap[*Config, *otlpReceiver]()