Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix merge config map provider to close the watchers #4570

Merged
merged 1 commit into from
Dec 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
- `service.telemetry.metrics.level` and `service.telemetry.metrics.address`
should be used to configure collector self-metrics.

## 🧰 Bug fixes 🧰

- Fix merge config map provider to close the watchers #4570

## v0.41.0 Beta

## 🛑 Breaking changes 🛑
Expand Down
10 changes: 9 additions & 1 deletion config/configmapprovider/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func NewMerge(ps ...Provider) Provider {
}

func (mp *mergeMapProvider) Retrieve(ctx context.Context, onChange func(*ChangeEvent)) (Retrieved, error) {
var retrs []Retrieved
retCfgMap := config.NewMap()
for _, p := range mp.providers {
retr, err := p.Retrieve(ctx, onChange)
Expand All @@ -49,8 +50,15 @@ func (mp *mergeMapProvider) Retrieve(ctx context.Context, onChange func(*ChangeE
if err = retCfgMap.Merge(cfgMap); err != nil {
return nil, err
}
retrs = append(retrs, retr)
}
return &simpleRetrieved{confMap: retCfgMap}, nil
return &simpleRetrieved{confMap: retCfgMap, closeFunc: func(ctxF context.Context) error {
var err error
for _, ret := range retrs {
err = multierr.Append(err, ret.Close(ctxF))
}
return err
}}, nil
}

func (mp *mergeMapProvider) Shutdown(ctx context.Context) error {
Expand Down
17 changes: 13 additions & 4 deletions config/configmapprovider/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,35 @@ import (
)

func TestMerge_GetError(t *testing.T) {
pl := NewMerge(&errProvider{err: nil}, &errProvider{errors.New("my error")})
pl := NewMerge(&errProvider{err: nil}, &errProvider{err: errors.New("my error")})
require.NotNil(t, pl)
cp, err := pl.Retrieve(context.Background(), nil)
assert.Error(t, err)
assert.Nil(t, cp)
}

func TestMerge_CloseError(t *testing.T) {
pl := NewMerge(&errProvider{err: nil}, &errProvider{errors.New("my error")})
pl := NewMerge(&errProvider{err: nil}, &errProvider{closeErr: errors.New("my error")})
require.NotNil(t, pl)
cp, err := pl.Retrieve(context.Background(), nil)
assert.NoError(t, err)
assert.Error(t, cp.Close(context.Background()))
}

func TestMerge_ShutdownError(t *testing.T) {
pl := NewMerge(&errProvider{err: nil}, &errProvider{err: errors.New("my error")})
require.NotNil(t, pl)
assert.Error(t, pl.Shutdown(context.Background()))
}

type errProvider struct {
err error
err error
closeErr error
}

func (epl *errProvider) Retrieve(context.Context, func(*ChangeEvent)) (Retrieved, error) {
if epl.err == nil {
return &simpleRetrieved{confMap: config.NewMap()}, nil
return &simpleRetrieved{confMap: config.NewMap(), closeFunc: func(context.Context) error { return epl.closeErr }}, nil
}
return nil, epl.err
}
Expand Down
10 changes: 8 additions & 2 deletions config/configmapprovider/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,21 @@ import (
"go.opentelemetry.io/collector/config"
)

type closeFunc func(ctx context.Context) error

// TODO: This probably will make sense to be exported, but needs better name and documentation.
type simpleRetrieved struct {
closeFunc
confMap *config.Map
}

func (sr *simpleRetrieved) Get(ctx context.Context) (*config.Map, error) {
func (sr *simpleRetrieved) Get(context.Context) (*config.Map, error) {
return sr.confMap, nil
}

func (sr *simpleRetrieved) Close(ctx context.Context) error {
return nil
if sr.closeFunc == nil {
return nil
}
return sr.closeFunc(ctx)
}