-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
Copy pathconfig_provider.go
130 lines (111 loc) · 4.6 KB
/
config_provider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package service // import "go.opentelemetry.io/collector/service"
import (
"context"
"fmt"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/mapconverter/expandmapconverter"
"go.opentelemetry.io/collector/config/mapprovider/envmapprovider"
"go.opentelemetry.io/collector/config/mapprovider/filemapprovider"
"go.opentelemetry.io/collector/config/mapprovider/yamlmapprovider"
"go.opentelemetry.io/collector/service/internal/configunmarshaler"
)
// ConfigProvider provides the service configuration.
//
// The typical usage is the following:
//
// cfgProvider.Get(...)
// cfgProvider.Watch() // wait for an event.
// cfgProvider.Get(...)
// cfgProvider.Watch() // wait for an event.
// // repeat Get/Watch cycle until it is time to shut down the Collector process.
// cfgProvider.Shutdown()
type ConfigProvider interface {
// Get returns the service configuration, or error otherwise.
//
// Should never be called concurrently with itself, Watch or Shutdown.
Get(ctx context.Context, factories component.Factories) (*Config, error)
// Watch blocks until any configuration change was detected or an unrecoverable error
// happened during monitoring the configuration changes.
//
// Error is nil if the configuration is changed and needs to be re-fetched. Any non-nil
// error indicates that there was a problem with watching the config changes.
//
// Should never be called concurrently with itself or Get.
Watch() <-chan error
// Shutdown signals that the provider is no longer in use and the that should close
// and release any resources that it may have created.
//
// This function must terminate the Watch channel.
//
// Should never be called concurrently with itself or Get.
Shutdown(ctx context.Context) error
}
type configProvider struct {
mapResolver *mapResolver
}
// ConfigProviderSettings are the settings to configure the behavior of the ConfigProvider.
type ConfigProviderSettings struct {
// Locations from where the config.Map is retrieved, and merged in the given order.
// It is required to have at least one location.
Locations []string
// MapProviders is a map of pairs <scheme, config.MapProvider>.
// It is required to have at least one config.MapProvider.
MapProviders map[string]config.MapProvider
// MapConverters is a slice of config.MapConverterFunc.
MapConverters []config.MapConverterFunc
}
func newDefaultConfigProviderSettings(locations []string) ConfigProviderSettings {
return ConfigProviderSettings{
Locations: locations,
MapProviders: makeMapProvidersMap(filemapprovider.New(), envmapprovider.New(), yamlmapprovider.New()),
MapConverters: []config.MapConverterFunc{expandmapconverter.New()},
}
}
// NewConfigProvider returns a new ConfigProvider that provides the service configuration:
// * Initially it resolves the "configuration map":
// * Retrieve the config.Map by merging all retrieved maps from the given `locations` in order.
// * Then applies all the config.MapConverterFunc in the given order.
// * Then unmarshalls the config.Map into the service Config.
func NewConfigProvider(set ConfigProviderSettings) (ConfigProvider, error) {
mr, err := newMapResolver(set.Locations, set.MapProviders, set.MapConverters)
if err != nil {
return nil, err
}
return &configProvider{
mapResolver: mr,
}, nil
}
func (cm *configProvider) Get(ctx context.Context, factories component.Factories) (*Config, error) {
retMap, err := cm.mapResolver.Resolve(ctx)
if err != nil {
return nil, fmt.Errorf("cannot resolve the configuration: %w", err)
}
var cfg *Config
if cfg, err = configunmarshaler.New().Unmarshal(retMap, factories); err != nil {
return nil, fmt.Errorf("cannot unmarshal the configuration: %w", err)
}
if err = cfg.Validate(); err != nil {
return nil, fmt.Errorf("invalid configuration: %w", err)
}
return cfg, nil
}
func (cm *configProvider) Watch() <-chan error {
return cm.mapResolver.Watch()
}
func (cm *configProvider) Shutdown(ctx context.Context) error {
return cm.mapResolver.Shutdown(ctx)
}