Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] polish sharedcomponent API #9157

Merged
merged 6 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 44 additions & 41 deletions internal/sharedcomponent/sharedcomponent.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package sharedcomponent exposes util functionality for receivers and exporters
// that need to share state between different signal types instances such as net.Listener or os.File.
// Package sharedcomponent exposes functionality for components
// to register against a shared key, such as a configuration object, in order to be reused across signal types.
// This is particularly useful when the component relies on a shared resource such as os.File or http.Server.
package sharedcomponent // import "go.opentelemetry.io/collector/internal/sharedcomponent"

import (
Expand All @@ -12,23 +13,25 @@ import (
"go.opentelemetry.io/collector/component"
)

// SharedComponents a map that keeps reference of all created instances for a given configuration,
// and ensures that the shared state is started and stopped only once.
type SharedComponents[K comparable, V component.Component] struct {
comps map[K]*SharedComponent[V]
// Map keeps reference of all created instances for a given shared key such as a component configuration.
type Map[K comparable, V component.Component] interface {
atoulme marked this conversation as resolved.
Show resolved Hide resolved
// LoadOrStore returns the already created instance if exists, otherwise creates a new instance
// and adds it to the map of references.
LoadOrStore(key K, create func() (V, error), telemetrySettings *component.TelemetrySettings) (*Component[V], error)
}

// NewSharedComponents returns a new empty SharedComponents.
func NewSharedComponents[K comparable, V component.Component]() *SharedComponents[K, V] {
return &SharedComponents[K, V]{
comps: make(map[K]*SharedComponent[V]),
func NewMap[K comparable, V component.Component]() Map[K, V] {
return &mapImpl[K, V]{
components: map[K]*Component[V]{},
}
}

// GetOrAdd returns the already created instance if exists, otherwise creates a new instance
// and adds it to the map of references.
func (scs *SharedComponents[K, V]) GetOrAdd(key K, create func() (V, error), telemetrySettings *component.TelemetrySettings) (*SharedComponent[V], error) {
if c, ok := scs.comps[key]; ok {
type mapImpl[K comparable, V component.Component] struct {
atoulme marked this conversation as resolved.
Show resolved Hide resolved
components map[K]*Component[V]
}

func (m *mapImpl[K, V]) LoadOrStore(key K, create func() (V, error), telemetrySettings *component.TelemetrySettings) (*Component[V], error) {
if c, ok := m.components[key]; ok {
// If we haven't already seen this telemetry settings, this shared component represents
// another instance. Wrap ReportComponentStatus to report for all instances this shared
// component represents.
Comment on lines 31 to 33
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure when this code was added, but I am confused that this is a duplicate of the graph instrumentation to report status for each component.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a subtle bug here that we need to review. If more than one error is produced by the successive reports, we ignore them.

Expand All @@ -50,23 +53,23 @@ func (scs *SharedComponents[K, V]) GetOrAdd(key K, create func() (V, error), tel
return nil, err
}

newComp := &SharedComponent[V]{
newComp := &Component[V]{
component: comp,
removeFunc: func() {
delete(scs.comps, key)
delete(m.components, key)
},
telemetry: telemetrySettings,
seenSettings: map[*component.TelemetrySettings]struct{}{
telemetrySettings: {},
},
}
scs.comps[key] = newComp
m.components[key] = newComp
return newComp, nil
}

// SharedComponent ensures that the wrapped component is started and stopped only once.
// When stopped it is removed from the SharedComponents map.
type SharedComponent[V component.Component] struct {
// Component ensures that the wrapped component is started and stopped only once.
// When stopped it is removed from the Map.
type Component[V component.Component] struct {
component V

startOnce sync.Once
Expand All @@ -78,42 +81,42 @@ type SharedComponent[V component.Component] struct {
}

// Unwrap returns the original component.
func (r *SharedComponent[V]) Unwrap() V {
return r.component
func (c *Component[V]) Unwrap() V {
return c.component
}

// Start implements component.Component.
func (r *SharedComponent[V]) Start(ctx context.Context, host component.Host) error {
// Start starts the underlying component if it never started before.
func (c *Component[V]) Start(ctx context.Context, host component.Host) error {
var err error
r.startOnce.Do(func() {
// It's important that status for a sharedcomponent is reported through its
// telemetrysettings to keep status in sync and avoid race conditions. This logic duplicates
c.startOnce.Do(func() {
// It's important that status for a shared component is reported through its
// telemetry settings to keep status in sync and avoid race conditions. This logic duplicates
// and takes priority over the automated status reporting that happens in graph, making the
// status reporting in graph a no-op.
_ = r.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStarting))
if err = r.component.Start(ctx, host); err != nil {
_ = r.telemetry.ReportComponentStatus(component.NewPermanentErrorEvent(err))
_ = c.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStarting))
if err = c.component.Start(ctx, host); err != nil {
_ = c.telemetry.ReportComponentStatus(component.NewPermanentErrorEvent(err))
}
})
return err
}

// Shutdown implements component.Component.
func (r *SharedComponent[V]) Shutdown(ctx context.Context) error {
// Shutdown shuts down the underlying component.
func (c *Component[V]) Shutdown(ctx context.Context) error {
var err error
r.stopOnce.Do(func() {
// It's important that status for a sharedcomponent is reported through its
// telemetrysettings to keep status in sync and avoid race conditions. This logic duplicates
c.stopOnce.Do(func() {
// It's important that status for a shared component is reported through its
// telemetry settings to keep status in sync and avoid race conditions. This logic duplicates
// and takes priority over the automated status reporting that happens in graph, making the
// the status reporting in graph a no-op.
_ = r.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopping))
err = r.component.Shutdown(ctx)
// status reporting in graph a no-op.
_ = c.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopping))
err = c.component.Shutdown(ctx)
if err != nil {
_ = r.telemetry.ReportComponentStatus(component.NewPermanentErrorEvent(err))
_ = c.telemetry.ReportComponentStatus(component.NewPermanentErrorEvent(err))
} else {
_ = r.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopped))
_ = c.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopped))
}
r.removeFunc()
c.removeFunc()
})
return err
}
50 changes: 25 additions & 25 deletions internal/sharedcomponent/sharedcomponent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,37 +23,37 @@ type baseComponent struct {
telemetry *component.TelemetrySettings
}

func TestNewSharedComponents(t *testing.T) {
comps := NewSharedComponents[component.ID, *baseComponent]()
assert.Len(t, comps.comps, 0)
func TestNewMap(t *testing.T) {
comps := NewMap[component.ID, *baseComponent]()
assert.Len(t, comps.(*mapImpl[component.ID, *baseComponent]).components, 0)
}

func TestNewSharedComponentsCreateError(t *testing.T) {
comps := NewSharedComponents[component.ID, *baseComponent]()
assert.Len(t, comps.comps, 0)
comps := NewMap[component.ID, *baseComponent]().(*mapImpl[component.ID, *baseComponent])
assert.Len(t, comps.components, 0)
myErr := errors.New("my error")
_, err := comps.GetOrAdd(
_, err := comps.LoadOrStore(
id,
func() (*baseComponent, error) { return nil, myErr },
newNopTelemetrySettings(),
)
assert.ErrorIs(t, err, myErr)
assert.Len(t, comps.comps, 0)
assert.Len(t, comps.components, 0)
}

func TestSharedComponentsGetOrAdd(t *testing.T) {
func TestSharedComponentsLoadOrStore(t *testing.T) {
nop := &baseComponent{}

comps := NewSharedComponents[component.ID, *baseComponent]()
got, err := comps.GetOrAdd(
comps := NewMap[component.ID, *baseComponent]()
got, err := comps.LoadOrStore(
id,
func() (*baseComponent, error) { return nop, nil },
newNopTelemetrySettings(),
)
require.NoError(t, err)
assert.Len(t, comps.comps, 1)
assert.Len(t, comps.(*mapImpl[component.ID, *baseComponent]).components, 1)
assert.Same(t, nop, got.Unwrap())
gotSecond, err := comps.GetOrAdd(
gotSecond, err := comps.LoadOrStore(
id,
func() (*baseComponent, error) { panic("should not be called") },
newNopTelemetrySettings(),
Expand All @@ -64,8 +64,8 @@ func TestSharedComponentsGetOrAdd(t *testing.T) {

// Shutdown nop will remove
assert.NoError(t, got.Shutdown(context.Background()))
assert.Len(t, comps.comps, 0)
gotThird, err := comps.GetOrAdd(
assert.Len(t, comps.(*mapImpl[component.ID, *baseComponent]).components, 0)
gotThird, err := comps.LoadOrStore(
id,
func() (*baseComponent, error) { return nop, nil },
newNopTelemetrySettings(),
Expand All @@ -88,8 +88,8 @@ func TestSharedComponent(t *testing.T) {
return wantErr
}}

comps := NewSharedComponents[component.ID, *baseComponent]()
got, err := comps.GetOrAdd(
comps := NewMap[component.ID, *baseComponent]()
got, err := comps.LoadOrStore(
id,
func() (*baseComponent, error) { return comp, nil },
newNopTelemetrySettings(),
Expand All @@ -100,13 +100,13 @@ func TestSharedComponent(t *testing.T) {
// Second time is not called anymore.
assert.NoError(t, got.Start(context.Background(), componenttest.NewNopHost()))
assert.Equal(t, 1, calledStart)
// first time, shutdown is called.
assert.Equal(t, wantErr, got.Shutdown(context.Background()))
assert.Equal(t, 1, calledStop)
// Second time is not called anymore.
assert.NoError(t, got.Shutdown(context.Background()))
assert.Equal(t, 1, calledStop)
}

func TestSharedComponentsReportStatus(t *testing.T) {
reportedStatuses := make(map[*component.InstanceID][]component.Status)
newStatusFunc := func() func(*component.StatusEvent) error {
Expand All @@ -122,31 +122,31 @@ func TestSharedComponentsReportStatus(t *testing.T) {
}

comp := &baseComponent{}
comps := NewSharedComponents[component.ID, *baseComponent]()
comps := NewMap[component.ID, *baseComponent]()
var telemetrySettings *component.TelemetrySettings

// make a shared component that represents three instances
for i := 0; i < 3; i++ {
telemetrySettings = newNopTelemetrySettings()
telemetrySettings.ReportComponentStatus = newStatusFunc()
// The initial settings for the shared component need to match the ones passed to the first
// invocation of GetOrAdd so that underlying telemetry settings reference can be used to
// invocation of LoadOrStore so that underlying telemetry settings reference can be used to
// wrap ReportComponentStatus for subsequently added "instances".
if i == 0 {
comp.telemetry = telemetrySettings
}
got, err := comps.GetOrAdd(
got, err := comps.LoadOrStore(
id,
func() (*baseComponent, error) { return comp, nil },
telemetrySettings,
)
require.NoError(t, err)
assert.Len(t, comps.comps, 1)
assert.Len(t, comps.(*mapImpl[component.ID, *baseComponent]).components, 1)
assert.Same(t, comp, got.Unwrap())
}

// make sure we don't try to represent a fourth instance if we reuse a telemetrySettings
_, _ = comps.GetOrAdd(
_, _ = comps.LoadOrStore(
id,
func() (*baseComponent, error) { return comp, nil },
telemetrySettings,
Expand Down Expand Up @@ -245,16 +245,16 @@ func TestReportStatusOnStartShutdown(t *testing.T) {
return tc.shutdownErr
}
}
comps := NewSharedComponents[component.ID, *baseComponent]()
var comp *SharedComponent[*baseComponent]
comps := NewMap[component.ID, *baseComponent]()
var comp *Component[*baseComponent]
var err error
for i := 0; i < 3; i++ {
telemetrySettings := newNopTelemetrySettings()
telemetrySettings.ReportComponentStatus = newStatusFunc()
if i == 0 {
base.telemetry = telemetrySettings
}
comp, err = comps.GetOrAdd(
comp, err = comps.LoadOrStore(
id,
func() (*baseComponent, error) { return base, nil },
telemetrySettings,
Expand Down
8 changes: 4 additions & 4 deletions receiver/otlpreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func createTraces(
nextConsumer consumer.Traces,
) (receiver.Traces, error) {
oCfg := cfg.(*Config)
r, err := receivers.GetOrAdd(
r, err := receivers.LoadOrStore(
oCfg,
func() (*otlpReceiver, error) {
return newOtlpReceiver(oCfg, &set)
Expand All @@ -93,7 +93,7 @@ func createMetrics(
consumer consumer.Metrics,
) (receiver.Metrics, error) {
oCfg := cfg.(*Config)
r, err := receivers.GetOrAdd(
r, err := receivers.LoadOrStore(
oCfg,
func() (*otlpReceiver, error) {
return newOtlpReceiver(oCfg, &set)
Expand All @@ -118,7 +118,7 @@ func createLog(
consumer consumer.Logs,
) (receiver.Logs, error) {
oCfg := cfg.(*Config)
r, err := receivers.GetOrAdd(
r, err := receivers.LoadOrStore(
oCfg,
func() (*otlpReceiver, error) {
return newOtlpReceiver(oCfg, &set)
Expand All @@ -141,4 +141,4 @@ func createLog(
// create separate objects, they must use one otlpReceiver object per configuration.
// When the receiver is shutdown it should be removed from this map so the same configuration
// can be recreated successfully.
var receivers = sharedcomponent.NewSharedComponents[*Config, *otlpReceiver]()
var receivers = sharedcomponent.NewMap[*Config, *otlpReceiver]()
Loading