diff --git a/CHANGELOG.md b/CHANGELOG.md index 14702312833..a85fa2fb7d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,10 @@ - `service.telemetry.metrics.level` and `service.telemetry.metrics.address` should be used to configure collector self-metrics. +## 🧰 Bug fixes 🧰 + +- Fix merge config map provider to close the watchers #4570 + ## v0.41.0 Beta ## 🛑 Breaking changes 🛑 diff --git a/config/configmapprovider/merge.go b/config/configmapprovider/merge.go index 98b0b339686..e6fc5d3fde2 100644 --- a/config/configmapprovider/merge.go +++ b/config/configmapprovider/merge.go @@ -36,6 +36,7 @@ func NewMerge(ps ...Provider) Provider { } func (mp *mergeMapProvider) Retrieve(ctx context.Context, onChange func(*ChangeEvent)) (Retrieved, error) { + var retrs []Retrieved retCfgMap := config.NewMap() for _, p := range mp.providers { retr, err := p.Retrieve(ctx, onChange) @@ -49,8 +50,15 @@ func (mp *mergeMapProvider) Retrieve(ctx context.Context, onChange func(*ChangeE if err = retCfgMap.Merge(cfgMap); err != nil { return nil, err } + retrs = append(retrs, retr) } - return &simpleRetrieved{confMap: retCfgMap}, nil + return &simpleRetrieved{confMap: retCfgMap, closeFunc: func(ctxF context.Context) error { + var err error + for _, ret := range retrs { + err = multierr.Append(err, ret.Close(ctxF)) + } + return err + }}, nil } func (mp *mergeMapProvider) Shutdown(ctx context.Context) error { diff --git a/config/configmapprovider/merge_test.go b/config/configmapprovider/merge_test.go index c556b4e00cc..a5a06e0a753 100644 --- a/config/configmapprovider/merge_test.go +++ b/config/configmapprovider/merge_test.go @@ -26,7 +26,7 @@ import ( ) func TestMerge_GetError(t *testing.T) { - pl := NewMerge(&errProvider{err: nil}, &errProvider{errors.New("my error")}) + pl := NewMerge(&errProvider{err: nil}, &errProvider{err: errors.New("my error")}) require.NotNil(t, pl) cp, err := pl.Retrieve(context.Background(), nil) assert.Error(t, err) @@ -34,18 +34,27 @@ func TestMerge_GetError(t *testing.T) { } func TestMerge_CloseError(t *testing.T) { - pl := NewMerge(&errProvider{err: nil}, &errProvider{errors.New("my error")}) + pl := NewMerge(&errProvider{err: nil}, &errProvider{closeErr: errors.New("my error")}) + require.NotNil(t, pl) + cp, err := pl.Retrieve(context.Background(), nil) + assert.NoError(t, err) + assert.Error(t, cp.Close(context.Background())) +} + +func TestMerge_ShutdownError(t *testing.T) { + pl := NewMerge(&errProvider{err: nil}, &errProvider{err: errors.New("my error")}) require.NotNil(t, pl) assert.Error(t, pl.Shutdown(context.Background())) } type errProvider struct { - err error + err error + closeErr error } func (epl *errProvider) Retrieve(context.Context, func(*ChangeEvent)) (Retrieved, error) { if epl.err == nil { - return &simpleRetrieved{confMap: config.NewMap()}, nil + return &simpleRetrieved{confMap: config.NewMap(), closeFunc: func(context.Context) error { return epl.closeErr }}, nil } return nil, epl.err } diff --git a/config/configmapprovider/simple.go b/config/configmapprovider/simple.go index e289e795002..dda6c0a1de6 100644 --- a/config/configmapprovider/simple.go +++ b/config/configmapprovider/simple.go @@ -20,15 +20,21 @@ import ( "go.opentelemetry.io/collector/config" ) +type closeFunc func(ctx context.Context) error + // TODO: This probably will make sense to be exported, but needs better name and documentation. type simpleRetrieved struct { + closeFunc confMap *config.Map } -func (sr *simpleRetrieved) Get(ctx context.Context) (*config.Map, error) { +func (sr *simpleRetrieved) Get(context.Context) (*config.Map, error) { return sr.confMap, nil } func (sr *simpleRetrieved) Close(ctx context.Context) error { - return nil + if sr.closeFunc == nil { + return nil + } + return sr.closeFunc(ctx) }