Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Embed merge support into the default ConfigProvider. #4637

Merged
merged 1 commit into from
Jan 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- Replace ConfigMapProvider and ConfigUnmarshaler in collector settings by one simpler ConfigProvider (#4590)
- Remove deprecated consumererror.Combine (#4597)
- Remove `configmapprovider.NewDefault`, `configmapprovider.NewExpand`, `configmapprovider.NewMerge` (#4600)
- The merge functionality is now embedded into `service.NewConfigProvider` (#4637).
- Move `configtest.LoadConfig` and `configtest.LoadConfigAndValidate` to `servicetest` (#4606)
- Builder: Remove deprecated `include-core` flag (#4616)

Expand Down
77 changes: 60 additions & 17 deletions service/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,23 +64,27 @@ type ConfigProvider interface {
}

type configProvider struct {
configMapProvider configmapprovider.Provider
cfgMapConverters []ConfigMapConverterFunc
configUnmarshaler configunmarshaler.ConfigUnmarshaler
configMapProviders []configmapprovider.Provider
cfgMapConverters []ConfigMapConverterFunc
configUnmarshaler configunmarshaler.ConfigUnmarshaler

sync.Mutex
ret configmapprovider.Retrieved
watcher chan error
}

// newConfigProvider returns a new ConfigProvider that provides the configuration using the given
// `configMapProvider` and the given `configUnmarshaler`.
func newConfigProvider(configMapProvider configmapprovider.Provider, configUnmarshaler configunmarshaler.ConfigUnmarshaler, cfgMapConverters ...ConfigMapConverterFunc) ConfigProvider {
// NewConfigProvider returns a new ConfigProvider that provides the configuration:
// * Retrieve the config.Map by merging all retrieved maps from all the configmapprovider.Provider in order.
// * Then applies all the ConfigMapConverterFunc in the given order.
// * Then unmarshalls the final config.Config using the given configunmarshaler.ConfigUnmarshaler.
//
// Notice: This API is experimental.
func NewConfigProvider(configMapProviders []configmapprovider.Provider, cfgMapConverters []ConfigMapConverterFunc, configUnmarshaler configunmarshaler.ConfigUnmarshaler) ConfigProvider {
return &configProvider{
configMapProvider: configMapProvider,
cfgMapConverters: cfgMapConverters,
configUnmarshaler: configUnmarshaler,
watcher: make(chan error, 1),
configMapProviders: configMapProviders,
cfgMapConverters: cfgMapConverters,
configUnmarshaler: configUnmarshaler,
watcher: make(chan error, 1),
}
}

Expand All @@ -90,11 +94,11 @@ type ConfigMapConverterFunc func(*config.Map) error

// NewDefaultConfigProvider returns the default ConfigProvider, and it creates configuration from a file
// defined by the given configFile and overwrites fields using properties.
func NewDefaultConfigProvider(configFileName string, properties []string, cfgMapConverters ...ConfigMapConverterFunc) ConfigProvider {
return newConfigProvider(
configprovider.NewDefaultMapProvider(configFileName, properties),
configunmarshaler.NewDefault(),
append(cfgMapConverters, configprovider.NewExpandConverter())...)
func NewDefaultConfigProvider(configFileName string, properties []string) ConfigProvider {
return NewConfigProvider(
[]configmapprovider.Provider{configmapprovider.NewFile(configFileName), configmapprovider.NewProperties(properties)},
[]ConfigMapConverterFunc{configprovider.NewExpandConverter()},
configunmarshaler.NewDefault())
}

func (cm *configProvider) Get(ctx context.Context, factories component.Factories) (*config.Config, error) {
Expand All @@ -104,7 +108,7 @@ func (cm *configProvider) Get(ctx context.Context, factories component.Factories
}

var err error
cm.ret, err = cm.configMapProvider.Retrieve(ctx, cm.onChange)
cm.ret, err = mergeRetrieve(ctx, cm.onChange, cm.configMapProviders)
if err != nil {
// Nothing to close, no valid retrieved value.
cm.ret = nil
Expand Down Expand Up @@ -156,5 +160,44 @@ func (cm *configProvider) closeIfNeeded(ctx context.Context) error {

func (cm *configProvider) Shutdown(ctx context.Context) error {
close(cm.watcher)
return multierr.Combine(cm.closeIfNeeded(ctx), cm.configMapProvider.Shutdown(ctx))
return multierr.Combine(cm.closeIfNeeded(ctx), mergeShutdown(ctx, cm.configMapProviders))
}

func mergeRetrieve(ctx context.Context, onChange func(*configmapprovider.ChangeEvent), providers []configmapprovider.Provider) (configmapprovider.Retrieved, error) {
var retrs []configmapprovider.Retrieved
retCfgMap := config.NewMap()
for _, p := range providers {
retr, err := p.Retrieve(ctx, onChange)
if err != nil {
return nil, err
}
cfgMap, err := retr.Get(ctx)
bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
if err = retCfgMap.Merge(cfgMap); err != nil {
return nil, err
}
retrs = append(retrs, retr)
}
return configmapprovider.NewRetrieved(
func(ctx context.Context) (*config.Map, error) {
return retCfgMap, nil
},
configmapprovider.WithClose(func(ctxF context.Context) error {
var err error
for _, ret := range retrs {
err = multierr.Append(err, ret.Close(ctxF))
}
return err
}))
}

func mergeShutdown(ctx context.Context, providers []configmapprovider.Provider) error {
var errs error
for _, p := range providers {
errs = multierr.Append(errs, p.Shutdown(ctx))
}

return errs
}
72 changes: 46 additions & 26 deletions service/config_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,25 @@ import (
"go.opentelemetry.io/collector/config/experimental/configsource"
)

type errConfigMapProvider struct {
ret *fakeRetrieved
err error
type mockProvider struct {
ret *fakeRetrieved
errR error
errS error
}

func (ecmp *errConfigMapProvider) Retrieve(_ context.Context, onChange func(*configmapprovider.ChangeEvent)) (configmapprovider.Retrieved, error) {
if ecmp.ret != nil {
ecmp.ret.onChange = onChange
func (m *mockProvider) Retrieve(_ context.Context, onChange func(*configmapprovider.ChangeEvent)) (configmapprovider.Retrieved, error) {
if m.errR != nil {
return nil, m.errR
}
if m.ret == nil {
return &fakeRetrieved{}, nil
}
return ecmp.ret, ecmp.err
m.ret.onChange = onChange
return m.ret, nil
}

func (ecmp *errConfigMapProvider) Shutdown(context.Context) error {
return nil
func (m *mockProvider) Shutdown(context.Context) error {
return m.errS
}

type errConfigUnmarshaler struct {
Expand All @@ -59,13 +64,22 @@ func (ecu *errConfigUnmarshaler) Unmarshal(*config.Map, component.Factories) (*c
type fakeRetrieved struct {
configmapprovider.Retrieved
retM *config.Map
errG error
errW error
errC error
onChange func(event *configmapprovider.ChangeEvent)
}

func (er *fakeRetrieved) Get(context.Context) (*config.Map, error) {
er.onChange(&configmapprovider.ChangeEvent{Error: er.errW})
if er.onChange != nil {
er.onChange(&configmapprovider.ChangeEvent{Error: er.errW})
}
if er.errG != nil {
return nil, er.errG
}
if er.retM == nil {
return config.NewMap(), nil
}
return er.retM, nil
}

Expand All @@ -79,7 +93,7 @@ func TestConfigProvider_Errors(t *testing.T) {

tests := []struct {
name string
parserProvider configmapprovider.Provider
parserProvider []configmapprovider.Provider
cfgMapConverters []ConfigMapConverterFunc
configUnmarshaler configunmarshaler.ConfigUnmarshaler
expectNewErr bool
Expand All @@ -88,57 +102,63 @@ func TestConfigProvider_Errors(t *testing.T) {
}{
{
name: "retrieve_err",
parserProvider: &errConfigMapProvider{err: errors.New("retrieve_err")},
parserProvider: []configmapprovider.Provider{&mockProvider{}, &mockProvider{errR: errors.New("retrieve_err")}},
configUnmarshaler: configunmarshaler.NewDefault(),
expectNewErr: true,
},
{
name: "get_err",
parserProvider: []configmapprovider.Provider{&mockProvider{}, &mockProvider{ret: &fakeRetrieved{errG: errors.New("retrieve_err")}}},
configUnmarshaler: configunmarshaler.NewDefault(),
expectNewErr: true,
},
{
name: "converter_err",
parserProvider: configmapprovider.NewFile(path.Join("testdata", "otelcol-nop.yaml")),
parserProvider: []configmapprovider.Provider{&mockProvider{}, configmapprovider.NewFile(path.Join("testdata", "otelcol-nop.yaml"))},
cfgMapConverters: []ConfigMapConverterFunc{func(c *config.Map) error { return errors.New("converter_err") }},
configUnmarshaler: configunmarshaler.NewDefault(),
expectNewErr: true,
},
{
name: "unmarshal_err",
parserProvider: configmapprovider.NewFile(path.Join("testdata", "otelcol-nop.yaml")),
parserProvider: []configmapprovider.Provider{&mockProvider{}, configmapprovider.NewFile(path.Join("testdata", "otelcol-nop.yaml"))},
configUnmarshaler: &errConfigUnmarshaler{err: errors.New("unmarshal_err")},
expectNewErr: true,
},
{
name: "validation_err",
parserProvider: configmapprovider.NewFile(path.Join("testdata", "otelcol-invalid.yaml")),
parserProvider: []configmapprovider.Provider{&mockProvider{}, configmapprovider.NewFile(path.Join("testdata", "otelcol-invalid.yaml"))},
configUnmarshaler: configunmarshaler.NewDefault(),
expectNewErr: true,
},
{
name: "watch_err",
parserProvider: func() configmapprovider.Provider {
parserProvider: func() []configmapprovider.Provider {
ret, err := configmapprovider.NewFile(path.Join("testdata", "otelcol-nop.yaml")).Retrieve(context.Background(), nil)
require.NoError(t, err)
m, err := ret.Get(context.Background())
require.NoError(t, err)
return &errConfigMapProvider{ret: &fakeRetrieved{retM: m, errW: errors.New("watch_err")}}
return []configmapprovider.Provider{&mockProvider{}, &mockProvider{ret: &fakeRetrieved{retM: m, errW: errors.New("watch_err")}}}
}(),
configUnmarshaler: configunmarshaler.NewDefault(),
expectWatchErr: true,
},
{
name: "close_err",
parserProvider: func() configmapprovider.Provider {
parserProvider: func() []configmapprovider.Provider {
ret, err := configmapprovider.NewFile(path.Join("testdata", "otelcol-nop.yaml")).Retrieve(context.Background(), nil)
require.NoError(t, err)
m, err := ret.Get(context.Background())
require.NoError(t, err)
return &errConfigMapProvider{ret: &fakeRetrieved{retM: m, errC: errors.New("close_err")}}
return []configmapprovider.Provider{&mockProvider{}, &mockProvider{ret: &fakeRetrieved{retM: m, errC: errors.New("close_err")}}}
}(),
configUnmarshaler: configunmarshaler.NewDefault(),
expectShutdownErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfgW := newConfigProvider(tt.parserProvider, tt.configUnmarshaler, tt.cfgMapConverters...)
cfgW := NewConfigProvider(tt.parserProvider, tt.cfgMapConverters, tt.configUnmarshaler)
_, errN := cfgW.Get(context.Background(), factories)
if tt.expectNewErr {
assert.Error(t, errN)
Expand Down Expand Up @@ -166,16 +186,16 @@ func TestConfigProvider_Errors(t *testing.T) {
func TestConfigProvider(t *testing.T) {
factories, errF := componenttest.NopFactories()
require.NoError(t, errF)
parserProvider := func() configmapprovider.Provider {
configMapProvider := func() configmapprovider.Provider {
// Use fakeRetrieved with nil errors to have Watchable interface implemented.
ret, err := configmapprovider.NewFile(path.Join("testdata", "otelcol-nop.yaml")).Retrieve(context.Background(), nil)
require.NoError(t, err)
m, err := ret.Get(context.Background())
require.NoError(t, err)
return &errConfigMapProvider{ret: &fakeRetrieved{retM: m}}
return &mockProvider{ret: &fakeRetrieved{retM: m}}
}()

cfgW := newConfigProvider(parserProvider, configunmarshaler.NewDefault())
cfgW := NewConfigProvider([]configmapprovider.Provider{configMapProvider}, nil, configunmarshaler.NewDefault())
_, errN := cfgW.Get(context.Background(), factories)
assert.NoError(t, errN)

Expand All @@ -199,7 +219,7 @@ func TestConfigProviderNoWatcher(t *testing.T) {
require.NoError(t, errF)

watcherWG := sync.WaitGroup{}
cfgW := newConfigProvider(configmapprovider.NewFile(path.Join("testdata", "otelcol-nop.yaml")), configunmarshaler.NewDefault())
cfgW := NewConfigProvider([]configmapprovider.Provider{configmapprovider.NewFile(path.Join("testdata", "otelcol-nop.yaml"))}, nil, configunmarshaler.NewDefault())
_, errN := cfgW.Get(context.Background(), factories)
assert.NoError(t, errN)

Expand All @@ -225,11 +245,11 @@ func TestConfigProvider_ShutdownClosesWatch(t *testing.T) {
require.NoError(t, err)
m, err := ret.Get(context.Background())
require.NoError(t, err)
return &errConfigMapProvider{ret: &fakeRetrieved{retM: m, errW: configsource.ErrSessionClosed}}
return &mockProvider{ret: &fakeRetrieved{retM: m, errW: configsource.ErrSessionClosed}}
}()

watcherWG := sync.WaitGroup{}
cfgW := newConfigProvider(configMapProvider, configunmarshaler.NewDefault())
cfgW := NewConfigProvider([]configmapprovider.Provider{configMapProvider}, nil, configunmarshaler.NewDefault())
_, errN := cfgW.Get(context.Background(), factories)
assert.NoError(t, errN)

Expand Down
25 changes: 0 additions & 25 deletions service/internal/configprovider/default.go

This file was deleted.

Loading