Skip to content

Commit

Permalink
Disallow direct implementation of retrieved
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Dec 17, 2021
1 parent 1941cc1 commit 684c3e2
Show file tree
Hide file tree
Showing 14 changed files with 233 additions and 128 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

## 🛑 Breaking changes 🛑

- Remove configmapprovider.NewInMemory() (#4507)
- Remove `configmapprovider.NewInMemory()` (#4507)
- Disallow direct implementation of `configmapprovider.Retrieved` (#4577)

## 💡 Enhancements 💡

Expand Down
4 changes: 3 additions & 1 deletion config/configmapprovider/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ func (emp *envMapProvider) Retrieve(_ context.Context, _ func(*ChangeEvent)) (Re
return nil, fmt.Errorf("unable to parse yaml: %w", err)
}

return &simpleRetrieved{confMap: config.NewMapFromStringMap(data)}, nil
return NewRetrieved(func(ctx context.Context) (*config.Map, error) {
return config.NewMapFromStringMap(data), nil
})
}

func (*envMapProvider) Shutdown(context.Context) error {
Expand Down
6 changes: 5 additions & 1 deletion config/configmapprovider/expand.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"fmt"
"os"

"go.opentelemetry.io/collector/config"
)

type expandMapProvider struct {
Expand All @@ -44,7 +46,9 @@ func (emp *expandMapProvider) Retrieve(ctx context.Context, onChange func(*Chang
for _, k := range cfgMap.AllKeys() {
cfgMap.Set(k, expandStringValues(cfgMap.Get(k)))
}
return &simpleRetrieved{confMap: cfgMap, closeFunc: retr.Close}, nil
return NewRetrieved(func(ctx context.Context) (*config.Map, error) {
return cfgMap, nil
}, WithClose(retr.Close))
}

func (emp *expandMapProvider) Shutdown(ctx context.Context) error {
Expand Down
20 changes: 10 additions & 10 deletions config/configmapprovider/expand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,36 +30,36 @@ func TestBaseRetrieveFailsOnRetrieve(t *testing.T) {
exp := NewExpand(&mockProvider{retrieveErr: retErr})
t.Cleanup(func() { require.NoError(t, exp.Shutdown(context.Background())) })
_, err := exp.Retrieve(context.Background(), nil)
require.Error(t, err)
require.ErrorIs(t, err, retErr)
assert.Error(t, err)
assert.ErrorIs(t, err, retErr)
}

func TestBaseRetrieveFailsOnGet(t *testing.T) {
getErr := errors.New("test error")
exp := NewExpand(&mockProvider{retrieved: &mockRetrieved{getErr: getErr}})
exp := NewExpand(&mockProvider{retrieved: newErrGetRetrieved(getErr)})
t.Cleanup(func() { require.NoError(t, exp.Shutdown(context.Background())) })
_, err := exp.Retrieve(context.Background(), nil)
require.Error(t, err)
require.ErrorIs(t, err, getErr)
assert.Error(t, err)
assert.ErrorIs(t, err, getErr)
}

func TestBaseRetrieveFailsOnClose(t *testing.T) {
closeErr := errors.New("test error")
exp := NewExpand(&mockProvider{retrieved: &mockRetrieved{closeErr: closeErr}})
exp := NewExpand(&mockProvider{retrieved: newErrCloseRetrieved(closeErr)})
t.Cleanup(func() { require.NoError(t, exp.Shutdown(context.Background())) })
ret, err := exp.Retrieve(context.Background(), nil)
require.NoError(t, err)
err = ret.Close(context.Background())
require.Error(t, err)
require.ErrorIs(t, err, closeErr)
assert.Error(t, err)
assert.ErrorIs(t, err, closeErr)
}

func TestBaseRetrieveFailsOnShutdown(t *testing.T) {
shutdownErr := errors.New("test error")
exp := NewExpand(&mockProvider{shutdownErr: shutdownErr})
err := exp.Shutdown(context.Background())
require.Error(t, err)
require.ErrorIs(t, err, shutdownErr)
assert.Error(t, err)
assert.ErrorIs(t, err, shutdownErr)
}

func TestExpand(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion config/configmapprovider/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ func (fmp *fileMapProvider) Retrieve(_ context.Context, _ func(*ChangeEvent)) (R
return nil, fmt.Errorf("unable to parse yaml: %w", err)
}

return &simpleRetrieved{confMap: config.NewMapFromStringMap(data)}, nil
return NewRetrieved(func(ctx context.Context) (*config.Map, error) {
return config.NewMapFromStringMap(data), nil
})
}

func (*fileMapProvider) Shutdown(context.Context) error {
Expand Down
18 changes: 11 additions & 7 deletions config/configmapprovider/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,17 @@ func (mp *mergeMapProvider) Retrieve(ctx context.Context, onChange func(*ChangeE
}
retrs = append(retrs, retr)
}
return &simpleRetrieved{confMap: retCfgMap, closeFunc: func(ctxF context.Context) error {
var err error
for _, ret := range retrs {
err = multierr.Append(err, ret.Close(ctxF))
}
return err
}}, nil
return NewRetrieved(
func(ctx context.Context) (*config.Map, error) {
return retCfgMap, nil
},
WithClose(func(ctxF context.Context) error {
var err error
for _, ret := range retrs {
err = multierr.Append(err, ret.Close(ctxF))
}
return err
}))
}

func (mp *mergeMapProvider) Shutdown(ctx context.Context) error {
Expand Down
21 changes: 14 additions & 7 deletions config/configmapprovider/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,30 @@ import (
)

func TestMerge_GetError(t *testing.T) {
pl := NewMerge(&mockProvider{}, &mockProvider{retrieved: &mockRetrieved{getErr: errors.New("my error")}})
getErr := errors.New("test error")
pl := NewMerge(&mockProvider{}, &mockProvider{retrieved: newErrGetRetrieved(getErr)})
require.NotNil(t, pl)
cp, err := pl.Retrieve(context.Background(), nil)
_, err := pl.Retrieve(context.Background(), nil)
assert.Error(t, err)
assert.Nil(t, cp)
assert.ErrorIs(t, err, getErr)
}

func TestMerge_CloseError(t *testing.T) {
pl := NewMerge(&mockProvider{}, &mockProvider{retrieved: &mockRetrieved{closeErr: errors.New("my error")}})
closeErr := errors.New("test error")
pl := NewMerge(&mockProvider{}, &mockProvider{retrieved: newErrCloseRetrieved(closeErr)})
require.NotNil(t, pl)
cp, err := pl.Retrieve(context.Background(), nil)
assert.NoError(t, err)
assert.Error(t, cp.Close(context.Background()))
err = cp.Close(context.Background())
assert.Error(t, err)
assert.ErrorIs(t, err, closeErr)
}

func TestMerge_ShutdownError(t *testing.T) {
pl := NewMerge(&mockProvider{}, &mockProvider{shutdownErr: errors.New("my error")})
shutdownErr := errors.New("test error")
pl := NewMerge(&mockProvider{}, &mockProvider{shutdownErr: shutdownErr})
require.NotNil(t, pl)
assert.Error(t, pl.Shutdown(context.Background()))
err := pl.Shutdown(context.Background())
assert.Error(t, err)
assert.ErrorIs(t, err, shutdownErr)
}
28 changes: 9 additions & 19 deletions config/configmapprovider/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (m *mockProvider) Retrieve(context.Context, func(*ChangeEvent)) (Retrieved,
return nil, m.retrieveErr
}
if m.retrieved == nil {
return &mockRetrieved{}, nil
return NewRetrieved(func(ctx context.Context) (*config.Map, error) { return config.NewMap(), nil })
}
return m.retrieved, nil
}
Expand All @@ -43,24 +43,14 @@ func (m *mockProvider) Shutdown(context.Context) error {
return m.shutdownErr
}

type mockRetrieved struct {
cfg *config.Map
getErr error
closeErr error
func newErrGetRetrieved(getErr error) Retrieved {
ret, _ := NewRetrieved(func(ctx context.Context) (*config.Map, error) { return nil, getErr })
return ret
}

var _ Retrieved = &mockRetrieved{}

func (sr *mockRetrieved) Get(context.Context) (*config.Map, error) {
if sr.getErr != nil {
return nil, sr.getErr
}
if sr.cfg == nil {
return config.NewMap(), nil
}
return sr.cfg, nil
}

func (sr *mockRetrieved) Close(context.Context) error {
return sr.closeErr
func newErrCloseRetrieved(closeErr error) Retrieved {
ret, _ := NewRetrieved(
func(ctx context.Context) (*config.Map, error) { return config.NewMap(), nil },
WithClose(func(ctx context.Context) error { return closeErr }))
return ret
}
8 changes: 6 additions & 2 deletions config/configmapprovider/properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ func NewProperties(properties []string) Provider {

func (pmp *propertiesMapProvider) Retrieve(_ context.Context, onChange func(*ChangeEvent)) (Retrieved, error) {
if len(pmp.properties) == 0 {
return &simpleRetrieved{confMap: config.NewMap()}, nil
return NewRetrieved(func(ctx context.Context) (*config.Map, error) {
return config.NewMap(), nil
})
}

b := &bytes.Buffer{}
Expand All @@ -67,7 +69,9 @@ func (pmp *propertiesMapProvider) Retrieve(_ context.Context, onChange func(*Cha
}
prop := maps.Unflatten(parsed, ".")

return &simpleRetrieved{confMap: config.NewMapFromStringMap(prop)}, nil
return NewRetrieved(func(ctx context.Context) (*config.Map, error) {
return config.NewMapFromStringMap(prop), nil
})
}

func (*propertiesMapProvider) Shutdown(context.Context) error {
Expand Down
53 changes: 14 additions & 39 deletions config/configmapprovider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,25 @@ package configmapprovider // import "go.opentelemetry.io/collector/config/config

import (
"context"

"go.opentelemetry.io/collector/config"
)

// Provider is an interface that helps to retrieve a config map and watch for any
// changes to the config map. Implementations may load the config from a file,
// a database or any other source.
//
// The typical usage is the following:
//
// r := mapProvider.Retrieve()
// r.Get()
// // wait for onChange() to be called.
// r.Close()
// r = mapProvider.Retrieve()
// r.Get()
// // wait for onChange() to be called.
// r.Close()
// // repeat Retrieve/Get/wait/Close cycle until it is time to shut down the Collector process.
// // ...
// mapProvider.Shutdown()
type Provider interface {
// Retrieve goes to the configuration source and retrieves the selected data which
// contains the value to be injected in the configuration and the corresponding watcher that
Expand Down Expand Up @@ -50,43 +62,6 @@ type Provider interface {
Shutdown(ctx context.Context) error
}

// Retrieved holds the result of a call to the Retrieve method of a Provider object.
//
// The typical usage is the following:
//
// r := mapProvider.Retrieve()
// r.Get()
// // wait for onChange() to be called.
// r.Close()
// r = mapProvider.Retrieve()
// r.Get()
// // wait for onChange() to be called.
// r.Close()
// // repeat Retrieve/Get/wait/Close cycle until it is time to shut down the Collector process.
// // ...
// mapProvider.Shutdown()
type Retrieved interface {
// Get returns the config Map.
// If Close is called before Get or concurrently with Get then Get
// should return immediately with ErrSessionClosed error.
// Should never be called concurrently with itself.
// If ctx is cancelled should return immediately with an error.
Get(ctx context.Context) (*config.Map, error)

// Close signals that the configuration for which it was used to retrieve values is
// no longer in use and the object should close and release any watchers that it
// may have created.
//
// This method must be called when the service ends, either in case of success or error.
//
// Should never be called concurrently with itself.
// May be called before, after or concurrently with Get.
// If ctx is cancelled should return immediately with an error.
//
// Calling Close on an already closed object should have no effect and should return nil.
Close(ctx context.Context) error
}

// ChangeEvent describes the particular change event that happened with the config.
// TODO: see if this can be eliminated.
type ChangeEvent struct {
Expand Down
100 changes: 100 additions & 0 deletions config/configmapprovider/retrieved.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package configmapprovider // import "go.opentelemetry.io/collector/config/configmapprovider"

import (
"context"
"errors"

"go.opentelemetry.io/collector/config"
)

// Retrieved holds the result of a call to the Retrieve method of a Provider object.
// This interface cannot be directly implemented. Implementations must use the NewRetrieved helper.
type Retrieved interface {
// Get returns the config Map.
// If Close is called before Get or concurrently with Get then Get
// should return immediately with ErrSessionClosed error.
// Should never be called concurrently with itself.
// If ctx is cancelled should return immediately with an error.
Get(ctx context.Context) (*config.Map, error)

// Close signals that the configuration for which it was used to retrieve values is
// no longer in use and the object should close and release any watchers that it
// may have created.
//
// This method must be called when the service ends, either in case of success or error.
//
// Should never be called concurrently with itself.
// May be called before, after or concurrently with Get.
// If ctx is cancelled should return immediately with an error.
//
// Calling Close on an already closed object should have no effect and should return nil.
Close(ctx context.Context) error

// privateRetrieved is an unexported func to disallow direct implementation.
privateRetrieved()
}

// GetFunc specifies the function invoked when the Retrieved.Get is being called.
type GetFunc func(context.Context) (*config.Map, error)

// Get implements the Retrieved.Get.
func (f GetFunc) Get(ctx context.Context) (*config.Map, error) {
return f(ctx)
}

// CloseFunc specifies the function invoked when the Retrieved.Close is being called.
type CloseFunc func(context.Context) error

// Close implements the Retrieved.Close.
func (f CloseFunc) Close(ctx context.Context) error {
if f == nil {
return nil
}
return f(ctx)
}

// RetrievedOption represents the possible options for NewRetrieved.
type RetrievedOption func(*retrieved)

// WithClose overrides the default `Close` function for a Retrieved.
// The default always returns nil.
func WithClose(closeFunc CloseFunc) RetrievedOption {
return func(o *retrieved) {
o.CloseFunc = closeFunc
}
}

type retrieved struct {
GetFunc
CloseFunc
}

func (retrieved) privateRetrieved() {}

// NewRetrieved returns a Retrieved configured with the provided options.
func NewRetrieved(getFunc GetFunc, options ...RetrievedOption) (Retrieved, error) {
if getFunc == nil {
return nil, errors.New("nil getFunc")
}
ret := &retrieved{
GetFunc: getFunc,
}
for _, op := range options {
op(ret)
}
return ret, nil
}
Loading

0 comments on commit 684c3e2

Please sign in to comment.