Skip to content

Commit

Permalink
Fix merge config map provider to close the watchers (#4570)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored Dec 17, 2021
1 parent 165fe8c commit 3eb11e0
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 7 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
- `service.telemetry.metrics.level` and `service.telemetry.metrics.address`
should be used to configure collector self-metrics.

## 🧰 Bug fixes 🧰

- Fix merge config map provider to close the watchers #4570

## v0.41.0 Beta

## 🛑 Breaking changes 🛑
Expand Down
10 changes: 9 additions & 1 deletion config/configmapprovider/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func NewMerge(ps ...Provider) Provider {
}

func (mp *mergeMapProvider) Retrieve(ctx context.Context, onChange func(*ChangeEvent)) (Retrieved, error) {
var retrs []Retrieved
retCfgMap := config.NewMap()
for _, p := range mp.providers {
retr, err := p.Retrieve(ctx, onChange)
Expand All @@ -49,8 +50,15 @@ func (mp *mergeMapProvider) Retrieve(ctx context.Context, onChange func(*ChangeE
if err = retCfgMap.Merge(cfgMap); err != nil {
return nil, err
}
retrs = append(retrs, retr)
}
return &simpleRetrieved{confMap: retCfgMap}, nil
return &simpleRetrieved{confMap: retCfgMap, closeFunc: func(ctxF context.Context) error {
var err error
for _, ret := range retrs {
err = multierr.Append(err, ret.Close(ctxF))
}
return err
}}, nil
}

func (mp *mergeMapProvider) Shutdown(ctx context.Context) error {
Expand Down
17 changes: 13 additions & 4 deletions config/configmapprovider/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,35 @@ import (
)

func TestMerge_GetError(t *testing.T) {
pl := NewMerge(&errProvider{err: nil}, &errProvider{errors.New("my error")})
pl := NewMerge(&errProvider{err: nil}, &errProvider{err: errors.New("my error")})
require.NotNil(t, pl)
cp, err := pl.Retrieve(context.Background(), nil)
assert.Error(t, err)
assert.Nil(t, cp)
}

func TestMerge_CloseError(t *testing.T) {
pl := NewMerge(&errProvider{err: nil}, &errProvider{errors.New("my error")})
pl := NewMerge(&errProvider{err: nil}, &errProvider{closeErr: errors.New("my error")})
require.NotNil(t, pl)
cp, err := pl.Retrieve(context.Background(), nil)
assert.NoError(t, err)
assert.Error(t, cp.Close(context.Background()))
}

func TestMerge_ShutdownError(t *testing.T) {
pl := NewMerge(&errProvider{err: nil}, &errProvider{err: errors.New("my error")})
require.NotNil(t, pl)
assert.Error(t, pl.Shutdown(context.Background()))
}

type errProvider struct {
err error
err error
closeErr error
}

func (epl *errProvider) Retrieve(context.Context, func(*ChangeEvent)) (Retrieved, error) {
if epl.err == nil {
return &simpleRetrieved{confMap: config.NewMap()}, nil
return &simpleRetrieved{confMap: config.NewMap(), closeFunc: func(context.Context) error { return epl.closeErr }}, nil
}
return nil, epl.err
}
Expand Down
10 changes: 8 additions & 2 deletions config/configmapprovider/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,21 @@ import (
"go.opentelemetry.io/collector/config"
)

type closeFunc func(ctx context.Context) error

// TODO: This probably will make sense to be exported, but needs better name and documentation.
type simpleRetrieved struct {
closeFunc
confMap *config.Map
}

func (sr *simpleRetrieved) Get(ctx context.Context) (*config.Map, error) {
func (sr *simpleRetrieved) Get(context.Context) (*config.Map, error) {
return sr.confMap, nil
}

func (sr *simpleRetrieved) Close(ctx context.Context) error {
return nil
if sr.closeFunc == nil {
return nil
}
return sr.closeFunc(ctx)
}

0 comments on commit 3eb11e0

Please sign in to comment.