Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] polish sharedcomponent API #9157

Merged
merged 6 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 27 additions & 26 deletions internal/sharedcomponent/sharedcomponent.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,26 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package sharedcomponent exposes util functionality for receivers and exporters
// that need to share state between different signal types instances such as net.Listener or os.File.
// Package sharedcomponent exposes functionality for components
// to register against a shared key, such as a configuration object, in order to be reused across signal types.
// This is particularly useful when the component relies on a shared resource such as os.File or http.Server.
package sharedcomponent // import "go.opentelemetry.io/collector/internal/sharedcomponent"

import (
"context"
"sync"
"sync/atomic"

"go.opentelemetry.io/collector/component"
)

// SharedComponents a map that keeps reference of all created instances for a given configuration,
// and ensures that the shared state is started and stopped only once.
type SharedComponents[K comparable, V component.Component] struct {
comps map[K]*SharedComponent[V]
}

// NewSharedComponents returns a new empty SharedComponents.
func NewSharedComponents[K comparable, V component.Component]() *SharedComponents[K, V] {
return &SharedComponents[K, V]{
comps: make(map[K]*SharedComponent[V]),
}
}
// Map keeps reference of all created instances for a given shared key such as a component configuration.
type Map[K comparable, V component.Component] map[K]*SharedComponent[V]

// GetOrAdd returns the already created instance if exists, otherwise creates a new instance
// LoadOrStore returns the already created instance if exists, otherwise creates a new instance
// and adds it to the map of references.
func (scs *SharedComponents[K, V]) GetOrAdd(key K, create func() (V, error), telemetrySettings *component.TelemetrySettings) (*SharedComponent[V], error) {
if c, ok := scs.comps[key]; ok {
func (scs Map[K, V]) LoadOrStore(key K, create func() (V, error), telemetrySettings *component.TelemetrySettings) (*SharedComponent[V], error) {
if c, ok := scs[key]; ok {
// If we haven't already seen this telemetry settings, this shared component represents
// another instance. Wrap ReportComponentStatus to report for all instances this shared
// component represents.
Comment on lines 31 to 33
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure when this code was added, but I am confused that this is a duplicate of the graph instrumentation to report status for each component.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a subtle bug here that we need to review. If more than one error is produced by the successive reports, we ignore them.

Expand All @@ -53,25 +45,27 @@ func (scs *SharedComponents[K, V]) GetOrAdd(key K, create func() (V, error), tel
newComp := &SharedComponent[V]{
component: comp,
removeFunc: func() {
delete(scs.comps, key)
delete(scs, key)
},
telemetry: telemetrySettings,
seenSettings: map[*component.TelemetrySettings]struct{}{
telemetrySettings: {},
},
activeCount: &atomic.Int32{},
}
scs.comps[key] = newComp
scs[key] = newComp
return newComp, nil
}

// SharedComponent ensures that the wrapped component is started and stopped only once.
// When stopped it is removed from the SharedComponents map.
// When stopped it is removed from the Map.
type SharedComponent[V component.Component] struct {
atoulme marked this conversation as resolved.
Show resolved Hide resolved
component V

startOnce sync.Once
stopOnce sync.Once
removeFunc func()
activeCount *atomic.Int32 // a counter keeping track of the number of active uses of the component
startOnce sync.Once
stopOnce sync.Once
removeFunc func()

telemetry *component.TelemetrySettings
seenSettings map[*component.TelemetrySettings]struct{}
Expand All @@ -82,7 +76,8 @@ func (r *SharedComponent[V]) Unwrap() V {
return r.component
}

// Start implements component.Component.
// Start starts the underlying component if it never started before. Each call to Start is counted as an active usage.
// Shutdown will shut down the underlying component if called as many times as Start is called.
func (r *SharedComponent[V]) Start(ctx context.Context, host component.Host) error {
var err error
r.startOnce.Do(func() {
Expand All @@ -95,17 +90,23 @@ func (r *SharedComponent[V]) Start(ctx context.Context, host component.Host) err
_ = r.telemetry.ReportComponentStatus(component.NewPermanentErrorEvent(err))
}
})
r.activeCount.Add(1)
atoulme marked this conversation as resolved.
Show resolved Hide resolved
return err
}

// Shutdown implements component.Component.
// Shutdown shuts down the underlying component if all known usages, measured by the number of times
// Start was called, are accounted for.
func (r *SharedComponent[V]) Shutdown(ctx context.Context) error {
if r.activeCount.Add(-1) > 0 {
return nil
}

var err error
r.stopOnce.Do(func() {
atoulme marked this conversation as resolved.
Show resolved Hide resolved
// It's important that status for a sharedcomponent is reported through its
// telemetrysettings to keep status in sync and avoid race conditions. This logic duplicates
// and takes priority over the automated status reporting that happens in graph, making the
// the status reporting in graph a no-op.
// status reporting in graph a no-op.
_ = r.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopping))
err = r.component.Shutdown(ctx)
if err != nil {
Expand Down
82 changes: 61 additions & 21 deletions internal/sharedcomponent/sharedcomponent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,36 +24,36 @@ type baseComponent struct {
}

func TestNewSharedComponents(t *testing.T) {
comps := NewSharedComponents[component.ID, *baseComponent]()
assert.Len(t, comps.comps, 0)
comps := Map[component.ID, *baseComponent]{}
assert.Len(t, comps, 0)
}

func TestNewSharedComponentsCreateError(t *testing.T) {
comps := NewSharedComponents[component.ID, *baseComponent]()
assert.Len(t, comps.comps, 0)
comps := Map[component.ID, *baseComponent]{}
assert.Len(t, comps, 0)
myErr := errors.New("my error")
_, err := comps.GetOrAdd(
_, err := comps.LoadOrStore(
id,
func() (*baseComponent, error) { return nil, myErr },
newNopTelemetrySettings(),
)
assert.ErrorIs(t, err, myErr)
assert.Len(t, comps.comps, 0)
assert.Len(t, comps, 0)
}

func TestSharedComponentsGetOrAdd(t *testing.T) {
nop := &baseComponent{}

comps := NewSharedComponents[component.ID, *baseComponent]()
got, err := comps.GetOrAdd(
comps := Map[component.ID, *baseComponent]{}
got, err := comps.LoadOrStore(
id,
func() (*baseComponent, error) { return nop, nil },
newNopTelemetrySettings(),
)
require.NoError(t, err)
assert.Len(t, comps.comps, 1)
assert.Len(t, comps, 1)
assert.Same(t, nop, got.Unwrap())
gotSecond, err := comps.GetOrAdd(
gotSecond, err := comps.LoadOrStore(
id,
func() (*baseComponent, error) { panic("should not be called") },
newNopTelemetrySettings(),
Expand All @@ -64,8 +64,8 @@ func TestSharedComponentsGetOrAdd(t *testing.T) {

// Shutdown nop will remove
assert.NoError(t, got.Shutdown(context.Background()))
assert.Len(t, comps.comps, 0)
gotThird, err := comps.GetOrAdd(
assert.Len(t, comps, 0)
gotThird, err := comps.LoadOrStore(
id,
func() (*baseComponent, error) { return nop, nil },
newNopTelemetrySettings(),
Expand All @@ -88,8 +88,8 @@ func TestSharedComponent(t *testing.T) {
return wantErr
}}

comps := NewSharedComponents[component.ID, *baseComponent]()
got, err := comps.GetOrAdd(
comps := Map[component.ID, *baseComponent]{}
got, err := comps.LoadOrStore(
id,
func() (*baseComponent, error) { return comp, nil },
newNopTelemetrySettings(),
Expand All @@ -100,9 +100,49 @@ func TestSharedComponent(t *testing.T) {
// Second time is not called anymore.
assert.NoError(t, got.Start(context.Background(), componenttest.NewNopHost()))
assert.Equal(t, 1, calledStart)
// first time, shutdown is not called.
assert.NoError(t, got.Shutdown(context.Background()))
assert.Equal(t, 0, calledStop)
// Second time is not called anymore.
assert.Equal(t, wantErr, got.Shutdown(context.Background()))
assert.Equal(t, 1, calledStop)
}

func TestSharedComponentLateShutdown(t *testing.T) {
calledStart := 0
calledStop := 0
comp := &baseComponent{
StartFunc: func(ctx context.Context, host component.Host) error {
calledStart++
return nil
},
ShutdownFunc: func(ctx context.Context) error {
calledStop++
return nil
}}

comps := Map[component.ID, *baseComponent]{}
got, err := comps.LoadOrStore(
id,
func() (*baseComponent, error) { return comp, nil },
newNopTelemetrySettings(),
)
require.NoError(t, err)
assert.NoError(t, got.Start(context.Background(), componenttest.NewNopHost()))
assert.Equal(t, 1, calledStart)
// Second time is not called anymore.
assert.NoError(t, got.Start(context.Background(), componenttest.NewNopHost()))
assert.Equal(t, 1, calledStart)
// there is still one use, so stop is not called.
assert.NoError(t, got.Shutdown(context.Background()))
assert.Equal(t, 0, calledStop)
// start one more time:
assert.NoError(t, got.Start(context.Background(), componenttest.NewNopHost()))
assert.Equal(t, 1, calledStart)
// there is still one use, so stop is not called.
assert.NoError(t, got.Shutdown(context.Background()))
assert.Equal(t, 0, calledStop)
// finally close all active uses
assert.NoError(t, got.Shutdown(context.Background()))
assert.Equal(t, 1, calledStop)
}
Expand All @@ -122,31 +162,31 @@ func TestSharedComponentsReportStatus(t *testing.T) {
}

comp := &baseComponent{}
comps := NewSharedComponents[component.ID, *baseComponent]()
comps := Map[component.ID, *baseComponent]{}
var telemetrySettings *component.TelemetrySettings

// make a shared component that represents three instances
for i := 0; i < 3; i++ {
telemetrySettings = newNopTelemetrySettings()
telemetrySettings.ReportComponentStatus = newStatusFunc()
// The initial settings for the shared component need to match the ones passed to the first
// invocation of GetOrAdd so that underlying telemetry settings reference can be used to
// invocation of LoadOrStore so that underlying telemetry settings reference can be used to
// wrap ReportComponentStatus for subsequently added "instances".
if i == 0 {
comp.telemetry = telemetrySettings
}
got, err := comps.GetOrAdd(
got, err := comps.LoadOrStore(
id,
func() (*baseComponent, error) { return comp, nil },
telemetrySettings,
)
require.NoError(t, err)
assert.Len(t, comps.comps, 1)
assert.Len(t, comps, 1)
assert.Same(t, comp, got.Unwrap())
}

// make sure we don't try to represent a fourth instance if we reuse a telemetrySettings
_, _ = comps.GetOrAdd(
_, _ = comps.LoadOrStore(
id,
func() (*baseComponent, error) { return comp, nil },
telemetrySettings,
Expand Down Expand Up @@ -245,7 +285,7 @@ func TestReportStatusOnStartShutdown(t *testing.T) {
return tc.shutdownErr
}
}
comps := NewSharedComponents[component.ID, *baseComponent]()
comps := Map[component.ID, *baseComponent]{}
var comp *SharedComponent[*baseComponent]
var err error
for i := 0; i < 3; i++ {
Expand All @@ -254,7 +294,7 @@ func TestReportStatusOnStartShutdown(t *testing.T) {
if i == 0 {
base.telemetry = telemetrySettings
}
comp, err = comps.GetOrAdd(
comp, err = comps.LoadOrStore(
id,
func() (*baseComponent, error) { return base, nil },
telemetrySettings,
Expand Down
8 changes: 4 additions & 4 deletions receiver/otlpreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func createTraces(
nextConsumer consumer.Traces,
) (receiver.Traces, error) {
oCfg := cfg.(*Config)
r, err := receivers.GetOrAdd(
r, err := receivers.LoadOrStore(
oCfg,
func() (*otlpReceiver, error) {
return newOtlpReceiver(oCfg, &set)
Expand All @@ -93,7 +93,7 @@ func createMetrics(
consumer consumer.Metrics,
) (receiver.Metrics, error) {
oCfg := cfg.(*Config)
r, err := receivers.GetOrAdd(
r, err := receivers.LoadOrStore(
oCfg,
func() (*otlpReceiver, error) {
return newOtlpReceiver(oCfg, &set)
Expand All @@ -118,7 +118,7 @@ func createLog(
consumer consumer.Logs,
) (receiver.Logs, error) {
oCfg := cfg.(*Config)
r, err := receivers.GetOrAdd(
r, err := receivers.LoadOrStore(
oCfg,
func() (*otlpReceiver, error) {
return newOtlpReceiver(oCfg, &set)
Expand All @@ -141,4 +141,4 @@ func createLog(
// create separate objects, they must use one otlpReceiver object per configuration.
// When the receiver is shutdown it should be removed from this map so the same configuration
// can be recreated successfully.
var receivers = sharedcomponent.NewSharedComponents[*Config, *otlpReceiver]()
var receivers = sharedcomponent.Map[*Config, *otlpReceiver]{}
Loading