From 598741382bf6bab9903a96472836eaf18847a884 Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Tue, 19 Dec 2023 22:15:59 -0800 Subject: [PATCH 1/6] [chore] polish sharedcomponent API --- internal/sharedcomponent/sharedcomponent.go | 53 ++++++------ .../sharedcomponent/sharedcomponent_test.go | 82 ++++++++++++++----- receiver/otlpreceiver/factory.go | 8 +- 3 files changed, 92 insertions(+), 51 deletions(-) diff --git a/internal/sharedcomponent/sharedcomponent.go b/internal/sharedcomponent/sharedcomponent.go index 23d7d08c79e..4f4041e2308 100644 --- a/internal/sharedcomponent/sharedcomponent.go +++ b/internal/sharedcomponent/sharedcomponent.go @@ -1,34 +1,26 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -// Package sharedcomponent exposes util functionality for receivers and exporters -// that need to share state between different signal types instances such as net.Listener or os.File. +// Package sharedcomponent exposes functionality for components +// to register against a shared key, such as a configuration object, in order to be reused across signal types. +// This is particularly useful when the component relies on a shared resource such as os.File or http.Server. package sharedcomponent // import "go.opentelemetry.io/collector/internal/sharedcomponent" import ( "context" "sync" + "sync/atomic" "go.opentelemetry.io/collector/component" ) -// SharedComponents a map that keeps reference of all created instances for a given configuration, -// and ensures that the shared state is started and stopped only once. -type SharedComponents[K comparable, V component.Component] struct { - comps map[K]*SharedComponent[V] -} - -// NewSharedComponents returns a new empty SharedComponents. -func NewSharedComponents[K comparable, V component.Component]() *SharedComponents[K, V] { - return &SharedComponents[K, V]{ - comps: make(map[K]*SharedComponent[V]), - } -} +// Map keeps reference of all created instances for a given shared key such as a component configuration. +type Map[K comparable, V component.Component] map[K]*SharedComponent[V] -// GetOrAdd returns the already created instance if exists, otherwise creates a new instance +// LoadOrStore returns the already created instance if exists, otherwise creates a new instance // and adds it to the map of references. -func (scs *SharedComponents[K, V]) GetOrAdd(key K, create func() (V, error), telemetrySettings *component.TelemetrySettings) (*SharedComponent[V], error) { - if c, ok := scs.comps[key]; ok { +func (scs Map[K, V]) LoadOrStore(key K, create func() (V, error), telemetrySettings *component.TelemetrySettings) (*SharedComponent[V], error) { + if c, ok := scs[key]; ok { // If we haven't already seen this telemetry settings, this shared component represents // another instance. Wrap ReportComponentStatus to report for all instances this shared // component represents. @@ -53,25 +45,27 @@ func (scs *SharedComponents[K, V]) GetOrAdd(key K, create func() (V, error), tel newComp := &SharedComponent[V]{ component: comp, removeFunc: func() { - delete(scs.comps, key) + delete(scs, key) }, telemetry: telemetrySettings, seenSettings: map[*component.TelemetrySettings]struct{}{ telemetrySettings: {}, }, + activeCount: &atomic.Int32{}, } - scs.comps[key] = newComp + scs[key] = newComp return newComp, nil } // SharedComponent ensures that the wrapped component is started and stopped only once. -// When stopped it is removed from the SharedComponents map. +// When stopped it is removed from the Map. type SharedComponent[V component.Component] struct { component V - startOnce sync.Once - stopOnce sync.Once - removeFunc func() + activeCount *atomic.Int32 // a counter keeping track of the number of active uses of the component + startOnce sync.Once + stopOnce sync.Once + removeFunc func() telemetry *component.TelemetrySettings seenSettings map[*component.TelemetrySettings]struct{} @@ -82,7 +76,8 @@ func (r *SharedComponent[V]) Unwrap() V { return r.component } -// Start implements component.Component. +// Start starts the underlying component if it never started before. Each call to Start is counted as an active usage. +// Shutdown will shut down the underlying component if called as many times as Start is called. func (r *SharedComponent[V]) Start(ctx context.Context, host component.Host) error { var err error r.startOnce.Do(func() { @@ -95,17 +90,23 @@ func (r *SharedComponent[V]) Start(ctx context.Context, host component.Host) err _ = r.telemetry.ReportComponentStatus(component.NewPermanentErrorEvent(err)) } }) + r.activeCount.Add(1) return err } -// Shutdown implements component.Component. +// Shutdown shuts down the underlying component if all known usages, measured by the number of times +// Start was called, are accounted for. func (r *SharedComponent[V]) Shutdown(ctx context.Context) error { + if r.activeCount.Add(-1) > 0 { + return nil + } + var err error r.stopOnce.Do(func() { // It's important that status for a sharedcomponent is reported through its // telemetrysettings to keep status in sync and avoid race conditions. This logic duplicates // and takes priority over the automated status reporting that happens in graph, making the - // the status reporting in graph a no-op. + // status reporting in graph a no-op. _ = r.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopping)) err = r.component.Shutdown(ctx) if err != nil { diff --git a/internal/sharedcomponent/sharedcomponent_test.go b/internal/sharedcomponent/sharedcomponent_test.go index 5ce081fa752..bf6803a253a 100644 --- a/internal/sharedcomponent/sharedcomponent_test.go +++ b/internal/sharedcomponent/sharedcomponent_test.go @@ -24,36 +24,36 @@ type baseComponent struct { } func TestNewSharedComponents(t *testing.T) { - comps := NewSharedComponents[component.ID, *baseComponent]() - assert.Len(t, comps.comps, 0) + comps := Map[component.ID, *baseComponent]{} + assert.Len(t, comps, 0) } func TestNewSharedComponentsCreateError(t *testing.T) { - comps := NewSharedComponents[component.ID, *baseComponent]() - assert.Len(t, comps.comps, 0) + comps := Map[component.ID, *baseComponent]{} + assert.Len(t, comps, 0) myErr := errors.New("my error") - _, err := comps.GetOrAdd( + _, err := comps.LoadOrStore( id, func() (*baseComponent, error) { return nil, myErr }, newNopTelemetrySettings(), ) assert.ErrorIs(t, err, myErr) - assert.Len(t, comps.comps, 0) + assert.Len(t, comps, 0) } func TestSharedComponentsGetOrAdd(t *testing.T) { nop := &baseComponent{} - comps := NewSharedComponents[component.ID, *baseComponent]() - got, err := comps.GetOrAdd( + comps := Map[component.ID, *baseComponent]{} + got, err := comps.LoadOrStore( id, func() (*baseComponent, error) { return nop, nil }, newNopTelemetrySettings(), ) require.NoError(t, err) - assert.Len(t, comps.comps, 1) + assert.Len(t, comps, 1) assert.Same(t, nop, got.Unwrap()) - gotSecond, err := comps.GetOrAdd( + gotSecond, err := comps.LoadOrStore( id, func() (*baseComponent, error) { panic("should not be called") }, newNopTelemetrySettings(), @@ -64,8 +64,8 @@ func TestSharedComponentsGetOrAdd(t *testing.T) { // Shutdown nop will remove assert.NoError(t, got.Shutdown(context.Background())) - assert.Len(t, comps.comps, 0) - gotThird, err := comps.GetOrAdd( + assert.Len(t, comps, 0) + gotThird, err := comps.LoadOrStore( id, func() (*baseComponent, error) { return nop, nil }, newNopTelemetrySettings(), @@ -88,8 +88,8 @@ func TestSharedComponent(t *testing.T) { return wantErr }} - comps := NewSharedComponents[component.ID, *baseComponent]() - got, err := comps.GetOrAdd( + comps := Map[component.ID, *baseComponent]{} + got, err := comps.LoadOrStore( id, func() (*baseComponent, error) { return comp, nil }, newNopTelemetrySettings(), @@ -100,9 +100,49 @@ func TestSharedComponent(t *testing.T) { // Second time is not called anymore. assert.NoError(t, got.Start(context.Background(), componenttest.NewNopHost())) assert.Equal(t, 1, calledStart) + // first time, shutdown is not called. + assert.NoError(t, got.Shutdown(context.Background())) + assert.Equal(t, 0, calledStop) + // Second time is not called anymore. assert.Equal(t, wantErr, got.Shutdown(context.Background())) assert.Equal(t, 1, calledStop) +} + +func TestSharedComponentLateShutdown(t *testing.T) { + calledStart := 0 + calledStop := 0 + comp := &baseComponent{ + StartFunc: func(ctx context.Context, host component.Host) error { + calledStart++ + return nil + }, + ShutdownFunc: func(ctx context.Context) error { + calledStop++ + return nil + }} + + comps := Map[component.ID, *baseComponent]{} + got, err := comps.LoadOrStore( + id, + func() (*baseComponent, error) { return comp, nil }, + newNopTelemetrySettings(), + ) + require.NoError(t, err) + assert.NoError(t, got.Start(context.Background(), componenttest.NewNopHost())) + assert.Equal(t, 1, calledStart) // Second time is not called anymore. + assert.NoError(t, got.Start(context.Background(), componenttest.NewNopHost())) + assert.Equal(t, 1, calledStart) + // there is still one use, so stop is not called. + assert.NoError(t, got.Shutdown(context.Background())) + assert.Equal(t, 0, calledStop) + // start one more time: + assert.NoError(t, got.Start(context.Background(), componenttest.NewNopHost())) + assert.Equal(t, 1, calledStart) + // there is still one use, so stop is not called. + assert.NoError(t, got.Shutdown(context.Background())) + assert.Equal(t, 0, calledStop) + // finally close all active uses assert.NoError(t, got.Shutdown(context.Background())) assert.Equal(t, 1, calledStop) } @@ -122,7 +162,7 @@ func TestSharedComponentsReportStatus(t *testing.T) { } comp := &baseComponent{} - comps := NewSharedComponents[component.ID, *baseComponent]() + comps := Map[component.ID, *baseComponent]{} var telemetrySettings *component.TelemetrySettings // make a shared component that represents three instances @@ -130,23 +170,23 @@ func TestSharedComponentsReportStatus(t *testing.T) { telemetrySettings = newNopTelemetrySettings() telemetrySettings.ReportComponentStatus = newStatusFunc() // The initial settings for the shared component need to match the ones passed to the first - // invocation of GetOrAdd so that underlying telemetry settings reference can be used to + // invocation of LoadOrStore so that underlying telemetry settings reference can be used to // wrap ReportComponentStatus for subsequently added "instances". if i == 0 { comp.telemetry = telemetrySettings } - got, err := comps.GetOrAdd( + got, err := comps.LoadOrStore( id, func() (*baseComponent, error) { return comp, nil }, telemetrySettings, ) require.NoError(t, err) - assert.Len(t, comps.comps, 1) + assert.Len(t, comps, 1) assert.Same(t, comp, got.Unwrap()) } // make sure we don't try to represent a fourth instance if we reuse a telemetrySettings - _, _ = comps.GetOrAdd( + _, _ = comps.LoadOrStore( id, func() (*baseComponent, error) { return comp, nil }, telemetrySettings, @@ -245,7 +285,7 @@ func TestReportStatusOnStartShutdown(t *testing.T) { return tc.shutdownErr } } - comps := NewSharedComponents[component.ID, *baseComponent]() + comps := Map[component.ID, *baseComponent]{} var comp *SharedComponent[*baseComponent] var err error for i := 0; i < 3; i++ { @@ -254,7 +294,7 @@ func TestReportStatusOnStartShutdown(t *testing.T) { if i == 0 { base.telemetry = telemetrySettings } - comp, err = comps.GetOrAdd( + comp, err = comps.LoadOrStore( id, func() (*baseComponent, error) { return base, nil }, telemetrySettings, diff --git a/receiver/otlpreceiver/factory.go b/receiver/otlpreceiver/factory.go index cce8b363cd4..ef20d592c13 100644 --- a/receiver/otlpreceiver/factory.go +++ b/receiver/otlpreceiver/factory.go @@ -68,7 +68,7 @@ func createTraces( nextConsumer consumer.Traces, ) (receiver.Traces, error) { oCfg := cfg.(*Config) - r, err := receivers.GetOrAdd( + r, err := receivers.LoadOrStore( oCfg, func() (*otlpReceiver, error) { return newOtlpReceiver(oCfg, &set) @@ -93,7 +93,7 @@ func createMetrics( consumer consumer.Metrics, ) (receiver.Metrics, error) { oCfg := cfg.(*Config) - r, err := receivers.GetOrAdd( + r, err := receivers.LoadOrStore( oCfg, func() (*otlpReceiver, error) { return newOtlpReceiver(oCfg, &set) @@ -118,7 +118,7 @@ func createLog( consumer consumer.Logs, ) (receiver.Logs, error) { oCfg := cfg.(*Config) - r, err := receivers.GetOrAdd( + r, err := receivers.LoadOrStore( oCfg, func() (*otlpReceiver, error) { return newOtlpReceiver(oCfg, &set) @@ -141,4 +141,4 @@ func createLog( // create separate objects, they must use one otlpReceiver object per configuration. // When the receiver is shutdown it should be removed from this map so the same configuration // can be recreated successfully. -var receivers = sharedcomponent.NewSharedComponents[*Config, *otlpReceiver]() +var receivers = sharedcomponent.Map[*Config, *otlpReceiver]{} From 38e2d7e659bd812048dad3ef63f58fbd3a049d3f Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Wed, 20 Dec 2023 10:50:33 -0800 Subject: [PATCH 2/6] SharedComponent -> Component --- internal/sharedcomponent/sharedcomponent.go | 24 +++++++++---------- .../sharedcomponent/sharedcomponent_test.go | 2 +- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/internal/sharedcomponent/sharedcomponent.go b/internal/sharedcomponent/sharedcomponent.go index 4f4041e2308..f9a607e57b5 100644 --- a/internal/sharedcomponent/sharedcomponent.go +++ b/internal/sharedcomponent/sharedcomponent.go @@ -15,11 +15,11 @@ import ( ) // Map keeps reference of all created instances for a given shared key such as a component configuration. -type Map[K comparable, V component.Component] map[K]*SharedComponent[V] +type Map[K comparable, V component.Component] map[K]*Component[V] // LoadOrStore returns the already created instance if exists, otherwise creates a new instance // and adds it to the map of references. -func (scs Map[K, V]) LoadOrStore(key K, create func() (V, error), telemetrySettings *component.TelemetrySettings) (*SharedComponent[V], error) { +func (scs Map[K, V]) LoadOrStore(key K, create func() (V, error), telemetrySettings *component.TelemetrySettings) (*Component[V], error) { if c, ok := scs[key]; ok { // If we haven't already seen this telemetry settings, this shared component represents // another instance. Wrap ReportComponentStatus to report for all instances this shared @@ -42,7 +42,7 @@ func (scs Map[K, V]) LoadOrStore(key K, create func() (V, error), telemetrySetti return nil, err } - newComp := &SharedComponent[V]{ + newComp := &Component[V]{ component: comp, removeFunc: func() { delete(scs, key) @@ -57,9 +57,9 @@ func (scs Map[K, V]) LoadOrStore(key K, create func() (V, error), telemetrySetti return newComp, nil } -// SharedComponent ensures that the wrapped component is started and stopped only once. +// Component ensures that the wrapped component is started and stopped only once. // When stopped it is removed from the Map. -type SharedComponent[V component.Component] struct { +type Component[V component.Component] struct { component V activeCount *atomic.Int32 // a counter keeping track of the number of active uses of the component @@ -72,17 +72,17 @@ type SharedComponent[V component.Component] struct { } // Unwrap returns the original component. -func (r *SharedComponent[V]) Unwrap() V { +func (r *Component[V]) Unwrap() V { return r.component } // Start starts the underlying component if it never started before. Each call to Start is counted as an active usage. // Shutdown will shut down the underlying component if called as many times as Start is called. -func (r *SharedComponent[V]) Start(ctx context.Context, host component.Host) error { +func (r *Component[V]) Start(ctx context.Context, host component.Host) error { var err error r.startOnce.Do(func() { - // It's important that status for a sharedcomponent is reported through its - // telemetrysettings to keep status in sync and avoid race conditions. This logic duplicates + // It's important that status for a shared component is reported through its + // telemetry settings to keep status in sync and avoid race conditions. This logic duplicates // and takes priority over the automated status reporting that happens in graph, making the // status reporting in graph a no-op. _ = r.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStarting)) @@ -96,15 +96,15 @@ func (r *SharedComponent[V]) Start(ctx context.Context, host component.Host) err // Shutdown shuts down the underlying component if all known usages, measured by the number of times // Start was called, are accounted for. -func (r *SharedComponent[V]) Shutdown(ctx context.Context) error { +func (r *Component[V]) Shutdown(ctx context.Context) error { if r.activeCount.Add(-1) > 0 { return nil } var err error r.stopOnce.Do(func() { - // It's important that status for a sharedcomponent is reported through its - // telemetrysettings to keep status in sync and avoid race conditions. This logic duplicates + // It's important that status for a shared component is reported through its + // telemetry settings to keep status in sync and avoid race conditions. This logic duplicates // and takes priority over the automated status reporting that happens in graph, making the // status reporting in graph a no-op. _ = r.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopping)) diff --git a/internal/sharedcomponent/sharedcomponent_test.go b/internal/sharedcomponent/sharedcomponent_test.go index bf6803a253a..35e2964df5c 100644 --- a/internal/sharedcomponent/sharedcomponent_test.go +++ b/internal/sharedcomponent/sharedcomponent_test.go @@ -286,7 +286,7 @@ func TestReportStatusOnStartShutdown(t *testing.T) { } } comps := Map[component.ID, *baseComponent]{} - var comp *SharedComponent[*baseComponent] + var comp *Component[*baseComponent] var err error for i := 0; i < 3; i++ { telemetrySettings := newNopTelemetrySettings() From 78c7fc2dceda193670b6921c90ac2c2b90f38cd9 Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Wed, 20 Dec 2023 11:47:19 -0800 Subject: [PATCH 3/6] reduce scope of changes --- internal/sharedcomponent/sharedcomponent.go | 20 +++------ .../sharedcomponent/sharedcomponent_test.go | 44 +------------------ 2 files changed, 7 insertions(+), 57 deletions(-) diff --git a/internal/sharedcomponent/sharedcomponent.go b/internal/sharedcomponent/sharedcomponent.go index f9a607e57b5..187e7cd1f55 100644 --- a/internal/sharedcomponent/sharedcomponent.go +++ b/internal/sharedcomponent/sharedcomponent.go @@ -9,7 +9,6 @@ package sharedcomponent // import "go.opentelemetry.io/collector/internal/shared import ( "context" "sync" - "sync/atomic" "go.opentelemetry.io/collector/component" ) @@ -51,7 +50,6 @@ func (scs Map[K, V]) LoadOrStore(key K, create func() (V, error), telemetrySetti seenSettings: map[*component.TelemetrySettings]struct{}{ telemetrySettings: {}, }, - activeCount: &atomic.Int32{}, } scs[key] = newComp return newComp, nil @@ -62,10 +60,9 @@ func (scs Map[K, V]) LoadOrStore(key K, create func() (V, error), telemetrySetti type Component[V component.Component] struct { component V - activeCount *atomic.Int32 // a counter keeping track of the number of active uses of the component - startOnce sync.Once - stopOnce sync.Once - removeFunc func() + startOnce sync.Once + stopOnce sync.Once + removeFunc func() telemetry *component.TelemetrySettings seenSettings map[*component.TelemetrySettings]struct{} @@ -76,8 +73,7 @@ func (r *Component[V]) Unwrap() V { return r.component } -// Start starts the underlying component if it never started before. Each call to Start is counted as an active usage. -// Shutdown will shut down the underlying component if called as many times as Start is called. +// Start starts the underlying component if it never started before. func (r *Component[V]) Start(ctx context.Context, host component.Host) error { var err error r.startOnce.Do(func() { @@ -90,17 +86,11 @@ func (r *Component[V]) Start(ctx context.Context, host component.Host) error { _ = r.telemetry.ReportComponentStatus(component.NewPermanentErrorEvent(err)) } }) - r.activeCount.Add(1) return err } -// Shutdown shuts down the underlying component if all known usages, measured by the number of times -// Start was called, are accounted for. +// Shutdown shuts down the underlying component. func (r *Component[V]) Shutdown(ctx context.Context) error { - if r.activeCount.Add(-1) > 0 { - return nil - } - var err error r.stopOnce.Do(func() { // It's important that status for a shared component is reported through its diff --git a/internal/sharedcomponent/sharedcomponent_test.go b/internal/sharedcomponent/sharedcomponent_test.go index 35e2964df5c..78cc614652b 100644 --- a/internal/sharedcomponent/sharedcomponent_test.go +++ b/internal/sharedcomponent/sharedcomponent_test.go @@ -41,7 +41,7 @@ func TestNewSharedComponentsCreateError(t *testing.T) { assert.Len(t, comps, 0) } -func TestSharedComponentsGetOrAdd(t *testing.T) { +func TestSharedComponentsLoadOrStore(t *testing.T) { nop := &baseComponent{} comps := Map[component.ID, *baseComponent]{} @@ -100,53 +100,13 @@ func TestSharedComponent(t *testing.T) { // Second time is not called anymore. assert.NoError(t, got.Start(context.Background(), componenttest.NewNopHost())) assert.Equal(t, 1, calledStart) - // first time, shutdown is not called. - assert.NoError(t, got.Shutdown(context.Background())) - assert.Equal(t, 0, calledStop) - // Second time is not called anymore. + // first time, shutdown is called. assert.Equal(t, wantErr, got.Shutdown(context.Background())) assert.Equal(t, 1, calledStop) -} - -func TestSharedComponentLateShutdown(t *testing.T) { - calledStart := 0 - calledStop := 0 - comp := &baseComponent{ - StartFunc: func(ctx context.Context, host component.Host) error { - calledStart++ - return nil - }, - ShutdownFunc: func(ctx context.Context) error { - calledStop++ - return nil - }} - - comps := Map[component.ID, *baseComponent]{} - got, err := comps.LoadOrStore( - id, - func() (*baseComponent, error) { return comp, nil }, - newNopTelemetrySettings(), - ) - require.NoError(t, err) - assert.NoError(t, got.Start(context.Background(), componenttest.NewNopHost())) - assert.Equal(t, 1, calledStart) // Second time is not called anymore. - assert.NoError(t, got.Start(context.Background(), componenttest.NewNopHost())) - assert.Equal(t, 1, calledStart) - // there is still one use, so stop is not called. - assert.NoError(t, got.Shutdown(context.Background())) - assert.Equal(t, 0, calledStop) - // start one more time: - assert.NoError(t, got.Start(context.Background(), componenttest.NewNopHost())) - assert.Equal(t, 1, calledStart) - // there is still one use, so stop is not called. - assert.NoError(t, got.Shutdown(context.Background())) - assert.Equal(t, 0, calledStop) - // finally close all active uses assert.NoError(t, got.Shutdown(context.Background())) assert.Equal(t, 1, calledStop) } - func TestSharedComponentsReportStatus(t *testing.T) { reportedStatuses := make(map[*component.InstanceID][]component.Status) newStatusFunc := func() func(*component.StatusEvent) error { From 1d6718b6c7c9bc9b4a7df67c116d13fe43a27d05 Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Wed, 20 Dec 2023 12:00:37 -0800 Subject: [PATCH 4/6] rename receivers --- internal/sharedcomponent/sharedcomponent.go | 36 ++++++++++----------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/internal/sharedcomponent/sharedcomponent.go b/internal/sharedcomponent/sharedcomponent.go index 187e7cd1f55..3a48a14075d 100644 --- a/internal/sharedcomponent/sharedcomponent.go +++ b/internal/sharedcomponent/sharedcomponent.go @@ -18,8 +18,8 @@ type Map[K comparable, V component.Component] map[K]*Component[V] // LoadOrStore returns the already created instance if exists, otherwise creates a new instance // and adds it to the map of references. -func (scs Map[K, V]) LoadOrStore(key K, create func() (V, error), telemetrySettings *component.TelemetrySettings) (*Component[V], error) { - if c, ok := scs[key]; ok { +func (m Map[K, V]) LoadOrStore(key K, create func() (V, error), telemetrySettings *component.TelemetrySettings) (*Component[V], error) { + if c, ok := m[key]; ok { // If we haven't already seen this telemetry settings, this shared component represents // another instance. Wrap ReportComponentStatus to report for all instances this shared // component represents. @@ -44,14 +44,14 @@ func (scs Map[K, V]) LoadOrStore(key K, create func() (V, error), telemetrySetti newComp := &Component[V]{ component: comp, removeFunc: func() { - delete(scs, key) + delete(m, key) }, telemetry: telemetrySettings, seenSettings: map[*component.TelemetrySettings]struct{}{ telemetrySettings: {}, }, } - scs[key] = newComp + m[key] = newComp return newComp, nil } @@ -69,42 +69,42 @@ type Component[V component.Component] struct { } // Unwrap returns the original component. -func (r *Component[V]) Unwrap() V { - return r.component +func (c *Component[V]) Unwrap() V { + return c.component } // Start starts the underlying component if it never started before. -func (r *Component[V]) Start(ctx context.Context, host component.Host) error { +func (c *Component[V]) Start(ctx context.Context, host component.Host) error { var err error - r.startOnce.Do(func() { + c.startOnce.Do(func() { // It's important that status for a shared component is reported through its // telemetry settings to keep status in sync and avoid race conditions. This logic duplicates // and takes priority over the automated status reporting that happens in graph, making the // status reporting in graph a no-op. - _ = r.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStarting)) - if err = r.component.Start(ctx, host); err != nil { - _ = r.telemetry.ReportComponentStatus(component.NewPermanentErrorEvent(err)) + _ = c.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStarting)) + if err = c.component.Start(ctx, host); err != nil { + _ = c.telemetry.ReportComponentStatus(component.NewPermanentErrorEvent(err)) } }) return err } // Shutdown shuts down the underlying component. -func (r *Component[V]) Shutdown(ctx context.Context) error { +func (c *Component[V]) Shutdown(ctx context.Context) error { var err error - r.stopOnce.Do(func() { + c.stopOnce.Do(func() { // It's important that status for a shared component is reported through its // telemetry settings to keep status in sync and avoid race conditions. This logic duplicates // and takes priority over the automated status reporting that happens in graph, making the // status reporting in graph a no-op. - _ = r.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopping)) - err = r.component.Shutdown(ctx) + _ = c.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopping)) + err = c.component.Shutdown(ctx) if err != nil { - _ = r.telemetry.ReportComponentStatus(component.NewPermanentErrorEvent(err)) + _ = c.telemetry.ReportComponentStatus(component.NewPermanentErrorEvent(err)) } else { - _ = r.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopped)) + _ = c.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopped)) } - r.removeFunc() + c.removeFunc() }) return err } From eb539b5f206f8224144ca9020f2ba38bb14ae9f4 Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Wed, 20 Dec 2023 16:51:17 -0800 Subject: [PATCH 5/6] move from builtin map to struct --- internal/sharedcomponent/sharedcomponent.go | 26 ++++++++++++++----- .../sharedcomponent/sharedcomponent_test.go | 26 +++++++++---------- receiver/otlpreceiver/factory.go | 2 +- 3 files changed, 33 insertions(+), 21 deletions(-) diff --git a/internal/sharedcomponent/sharedcomponent.go b/internal/sharedcomponent/sharedcomponent.go index 3a48a14075d..098de1f5b70 100644 --- a/internal/sharedcomponent/sharedcomponent.go +++ b/internal/sharedcomponent/sharedcomponent.go @@ -14,12 +14,24 @@ import ( ) // Map keeps reference of all created instances for a given shared key such as a component configuration. -type Map[K comparable, V component.Component] map[K]*Component[V] +type Map[K comparable, V component.Component] interface { + // LoadOrStore returns the already created instance if exists, otherwise creates a new instance + // and adds it to the map of references. + LoadOrStore(key K, create func() (V, error), telemetrySettings *component.TelemetrySettings) (*Component[V], error) +} + +func NewMap[K comparable, V component.Component]() Map[K, V] { + return &mapImpl[K, V]{ + components: map[K]*Component[V]{}, + } +} + +type mapImpl[K comparable, V component.Component] struct { + components map[K]*Component[V] +} -// LoadOrStore returns the already created instance if exists, otherwise creates a new instance -// and adds it to the map of references. -func (m Map[K, V]) LoadOrStore(key K, create func() (V, error), telemetrySettings *component.TelemetrySettings) (*Component[V], error) { - if c, ok := m[key]; ok { +func (m *mapImpl[K, V]) LoadOrStore(key K, create func() (V, error), telemetrySettings *component.TelemetrySettings) (*Component[V], error) { + if c, ok := m.components[key]; ok { // If we haven't already seen this telemetry settings, this shared component represents // another instance. Wrap ReportComponentStatus to report for all instances this shared // component represents. @@ -44,14 +56,14 @@ func (m Map[K, V]) LoadOrStore(key K, create func() (V, error), telemetrySetting newComp := &Component[V]{ component: comp, removeFunc: func() { - delete(m, key) + delete(m.components, key) }, telemetry: telemetrySettings, seenSettings: map[*component.TelemetrySettings]struct{}{ telemetrySettings: {}, }, } - m[key] = newComp + m.components[key] = newComp return newComp, nil } diff --git a/internal/sharedcomponent/sharedcomponent_test.go b/internal/sharedcomponent/sharedcomponent_test.go index 78cc614652b..767c7fa02c2 100644 --- a/internal/sharedcomponent/sharedcomponent_test.go +++ b/internal/sharedcomponent/sharedcomponent_test.go @@ -23,14 +23,14 @@ type baseComponent struct { telemetry *component.TelemetrySettings } -func TestNewSharedComponents(t *testing.T) { - comps := Map[component.ID, *baseComponent]{} - assert.Len(t, comps, 0) +func TestNewMap(t *testing.T) { + comps := NewMap[component.ID, *baseComponent]() + assert.Len(t, comps.(*mapImpl[component.ID, *baseComponent]).components, 0) } func TestNewSharedComponentsCreateError(t *testing.T) { - comps := Map[component.ID, *baseComponent]{} - assert.Len(t, comps, 0) + comps := NewMap[component.ID, *baseComponent]().(*mapImpl[component.ID, *baseComponent]) + assert.Len(t, comps.components, 0) myErr := errors.New("my error") _, err := comps.LoadOrStore( id, @@ -38,20 +38,20 @@ func TestNewSharedComponentsCreateError(t *testing.T) { newNopTelemetrySettings(), ) assert.ErrorIs(t, err, myErr) - assert.Len(t, comps, 0) + assert.Len(t, comps.components, 0) } func TestSharedComponentsLoadOrStore(t *testing.T) { nop := &baseComponent{} - comps := Map[component.ID, *baseComponent]{} + comps := NewMap[component.ID, *baseComponent]() got, err := comps.LoadOrStore( id, func() (*baseComponent, error) { return nop, nil }, newNopTelemetrySettings(), ) require.NoError(t, err) - assert.Len(t, comps, 1) + assert.Len(t, comps.(*mapImpl[component.ID, *baseComponent]).components, 1) assert.Same(t, nop, got.Unwrap()) gotSecond, err := comps.LoadOrStore( id, @@ -64,7 +64,7 @@ func TestSharedComponentsLoadOrStore(t *testing.T) { // Shutdown nop will remove assert.NoError(t, got.Shutdown(context.Background())) - assert.Len(t, comps, 0) + assert.Len(t, comps.(*mapImpl[component.ID, *baseComponent]).components, 0) gotThird, err := comps.LoadOrStore( id, func() (*baseComponent, error) { return nop, nil }, @@ -88,7 +88,7 @@ func TestSharedComponent(t *testing.T) { return wantErr }} - comps := Map[component.ID, *baseComponent]{} + comps := NewMap[component.ID, *baseComponent]() got, err := comps.LoadOrStore( id, func() (*baseComponent, error) { return comp, nil }, @@ -122,7 +122,7 @@ func TestSharedComponentsReportStatus(t *testing.T) { } comp := &baseComponent{} - comps := Map[component.ID, *baseComponent]{} + comps := NewMap[component.ID, *baseComponent]() var telemetrySettings *component.TelemetrySettings // make a shared component that represents three instances @@ -141,7 +141,7 @@ func TestSharedComponentsReportStatus(t *testing.T) { telemetrySettings, ) require.NoError(t, err) - assert.Len(t, comps, 1) + assert.Len(t, comps.(*mapImpl[component.ID, *baseComponent]).components, 1) assert.Same(t, comp, got.Unwrap()) } @@ -245,7 +245,7 @@ func TestReportStatusOnStartShutdown(t *testing.T) { return tc.shutdownErr } } - comps := Map[component.ID, *baseComponent]{} + comps := NewMap[component.ID, *baseComponent]() var comp *Component[*baseComponent] var err error for i := 0; i < 3; i++ { diff --git a/receiver/otlpreceiver/factory.go b/receiver/otlpreceiver/factory.go index ef20d592c13..125c7c9f296 100644 --- a/receiver/otlpreceiver/factory.go +++ b/receiver/otlpreceiver/factory.go @@ -141,4 +141,4 @@ func createLog( // create separate objects, they must use one otlpReceiver object per configuration. // When the receiver is shutdown it should be removed from this map so the same configuration // can be recreated successfully. -var receivers = sharedcomponent.Map[*Config, *otlpReceiver]{} +var receivers = sharedcomponent.NewMap[*Config, *otlpReceiver]() From 32fdc29bd91cdb40903fc7da2570c60503e74718 Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Wed, 20 Dec 2023 21:40:58 -0800 Subject: [PATCH 6/6] remove interface --- internal/sharedcomponent/sharedcomponent.go | 18 +++++++----------- .../sharedcomponent/sharedcomponent_test.go | 10 +++++----- 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/internal/sharedcomponent/sharedcomponent.go b/internal/sharedcomponent/sharedcomponent.go index 098de1f5b70..417137a8919 100644 --- a/internal/sharedcomponent/sharedcomponent.go +++ b/internal/sharedcomponent/sharedcomponent.go @@ -13,24 +13,20 @@ import ( "go.opentelemetry.io/collector/component" ) -// Map keeps reference of all created instances for a given shared key such as a component configuration. -type Map[K comparable, V component.Component] interface { - // LoadOrStore returns the already created instance if exists, otherwise creates a new instance - // and adds it to the map of references. - LoadOrStore(key K, create func() (V, error), telemetrySettings *component.TelemetrySettings) (*Component[V], error) -} - -func NewMap[K comparable, V component.Component]() Map[K, V] { - return &mapImpl[K, V]{ +func NewMap[K comparable, V component.Component]() *Map[K, V] { + return &Map[K, V]{ components: map[K]*Component[V]{}, } } -type mapImpl[K comparable, V component.Component] struct { +// Map keeps reference of all created instances for a given shared key such as a component configuration. +type Map[K comparable, V component.Component] struct { components map[K]*Component[V] } -func (m *mapImpl[K, V]) LoadOrStore(key K, create func() (V, error), telemetrySettings *component.TelemetrySettings) (*Component[V], error) { +// LoadOrStore returns the already created instance if exists, otherwise creates a new instance +// and adds it to the map of references. +func (m *Map[K, V]) LoadOrStore(key K, create func() (V, error), telemetrySettings *component.TelemetrySettings) (*Component[V], error) { if c, ok := m.components[key]; ok { // If we haven't already seen this telemetry settings, this shared component represents // another instance. Wrap ReportComponentStatus to report for all instances this shared diff --git a/internal/sharedcomponent/sharedcomponent_test.go b/internal/sharedcomponent/sharedcomponent_test.go index 767c7fa02c2..7254bcfcbf3 100644 --- a/internal/sharedcomponent/sharedcomponent_test.go +++ b/internal/sharedcomponent/sharedcomponent_test.go @@ -25,11 +25,11 @@ type baseComponent struct { func TestNewMap(t *testing.T) { comps := NewMap[component.ID, *baseComponent]() - assert.Len(t, comps.(*mapImpl[component.ID, *baseComponent]).components, 0) + assert.Len(t, comps.components, 0) } func TestNewSharedComponentsCreateError(t *testing.T) { - comps := NewMap[component.ID, *baseComponent]().(*mapImpl[component.ID, *baseComponent]) + comps := NewMap[component.ID, *baseComponent]() assert.Len(t, comps.components, 0) myErr := errors.New("my error") _, err := comps.LoadOrStore( @@ -51,7 +51,7 @@ func TestSharedComponentsLoadOrStore(t *testing.T) { newNopTelemetrySettings(), ) require.NoError(t, err) - assert.Len(t, comps.(*mapImpl[component.ID, *baseComponent]).components, 1) + assert.Len(t, comps.components, 1) assert.Same(t, nop, got.Unwrap()) gotSecond, err := comps.LoadOrStore( id, @@ -64,7 +64,7 @@ func TestSharedComponentsLoadOrStore(t *testing.T) { // Shutdown nop will remove assert.NoError(t, got.Shutdown(context.Background())) - assert.Len(t, comps.(*mapImpl[component.ID, *baseComponent]).components, 0) + assert.Len(t, comps.components, 0) gotThird, err := comps.LoadOrStore( id, func() (*baseComponent, error) { return nop, nil }, @@ -141,7 +141,7 @@ func TestSharedComponentsReportStatus(t *testing.T) { telemetrySettings, ) require.NoError(t, err) - assert.Len(t, comps.(*mapImpl[component.ID, *baseComponent]).components, 1) + assert.Len(t, comps.components, 1) assert.Same(t, comp, got.Unwrap()) }