Skip to content

Commit

Permalink
includeconfigsource: add tests for watch file and template files (ope…
Browse files Browse the repository at this point in the history
…n-telemetry#1972)

* fix templating and add tests for watch file and template files

* add integration test for templated configs

* rename test file

* Update tests/general/include_config_source_test.go

Co-authored-by: Ryan Fitzpatrick <rmfitzpatrick@users.noreply.github.com>

* call cancel in subprocess shutdown

Co-authored-by: Ryan Fitzpatrick <rmfitzpatrick@users.noreply.github.com>
  • Loading branch information
ccordi and rmfitzpatrick authored Sep 22, 2022
1 parent 8375ec0 commit 070dbe7
Show file tree
Hide file tree
Showing 9 changed files with 236 additions and 1 deletion.
8 changes: 7 additions & 1 deletion internal/configsource/includeconfigsource/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,13 @@ func (is *includeConfigSource) Retrieve(_ context.Context, selector string, para
}

var buf bytes.Buffer
if err = tmpl.Execute(&buf, paramsConfigMap); err != nil {
var params map[string]any
if paramsConfigMap != nil {
params = paramsConfigMap.ToStringMap()
} else {
params = map[string]any{}
}
if err = tmpl.Execute(&buf, params); err != nil {
return nil, err
}

Expand Down
85 changes: 85 additions & 0 deletions internal/configsource/includeconfigsource/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ func TestIncludeConfigSource_Session(t *testing.T) {
selector: "no_params_template",
expected: []byte("bool_field: true"),
},
{
name: "param_template",
selector: "param_template",
params: map[string]any{
"glob_pattern": "myPattern",
},
expected: []byte("logs_path: myPattern"),
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -82,6 +90,83 @@ func TestIncludeConfigSource_Session(t *testing.T) {
}
}

func TestIncludeConfigSource_WatchFileClose(t *testing.T) {
s, err := newConfigSource(configprovider.CreateParams{}, &Config{WatchFiles: true})
require.NoError(t, err)
require.NotNil(t, s)

ctx := context.Background()
defer func() {
assert.NoError(t, s.Close(ctx))
}()

// Write out an initial test file
f, err := os.CreateTemp("", "watch_file_test")
require.NoError(t, err)
defer func() {
require.NoError(t, os.Remove(f.Name()))
}()
_, err = f.Write([]byte("val1"))
require.NoError(t, err)
require.NoError(t, f.Close())

// Perform initial retrieve
r, err := s.Retrieve(ctx, f.Name(), nil)
require.NoError(t, err)
require.NotNil(t, r)
assert.Equal(t, []byte("val1"), r.Value())

watched, ok := r.(configsource.Watchable)
assert.True(t, ok)

// Close current source.
require.NoError(t, s.Close(context.Background()))
watcherErr := watched.WatchForUpdate()
require.ErrorIs(t, watcherErr, configsource.ErrSessionClosed)

}

func TestIncludeConfigSource_WatchFileUpdate(t *testing.T) {
s, err := newConfigSource(configprovider.CreateParams{}, &Config{WatchFiles: true})
require.NoError(t, err)
require.NotNil(t, s)

ctx := context.Background()
defer func() {
assert.NoError(t, s.Close(ctx))
}()

// Write out an initial test file
f, err := os.CreateTemp("", "watch_file_test")
require.NoError(t, err)
defer func() {
require.NoError(t, os.Remove(f.Name()))
}()
_, err = f.Write([]byte("val1"))
require.NoError(t, err)
require.NoError(t, f.Close())

// Perform initial retrieve
r, err := s.Retrieve(ctx, f.Name(), nil)
require.NoError(t, err)
require.NotNil(t, r)
assert.Equal(t, []byte("val1"), r.Value())
watched, ok := r.(configsource.Watchable)
assert.True(t, ok)

// Write update to file
err = os.WriteFile(f.Name(), []byte("val2"), 0600)
require.NoError(t, err)
watcherErr := watched.WatchForUpdate()
require.ErrorIs(t, watcherErr, configsource.ErrValueUpdated)

// Check updated file after waiting for update
r, err = s.Retrieve(ctx, f.Name(), nil)
require.NoError(t, err)
require.NotNil(t, r)
assert.Equal(t, []byte("val2"), r.Value())
}

func TestIncludeConfigSource_DeleteFile(t *testing.T) {
s, err := newConfigSource(configprovider.CreateParams{}, &Config{DeleteFiles: true})
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
logs_path: {{ .glob_pattern }}
111 changes: 111 additions & 0 deletions tests/general/include_config_source_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package tests

import (
"fmt"
"io"
"net/http"
"path"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/confmap"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"
"gopkg.in/yaml.v2"

"github.com/signalfx/splunk-otel-collector/tests/testutils"
)

func TestCollectorProcessWithMultipleTemplateConfigs(t *testing.T) {
logCore, logs := observer.New(zap.DebugLevel)
logger := zap.New(logCore)
collector, err := testutils.NewCollectorProcess().
WithArgs("--config", path.Join(".", "testdata", "templated.yaml")).
WithLogger(logger).
Build()

require.NotNil(t, collector)
require.NoError(t, err)

err = collector.Start()
require.NoError(t, err)

require.Eventually(t, func() bool {
for _, log := range logs.All() {
if strings.Contains(log.Message,
`Set config to [testdata/templated.yaml]`,
) {
return true
}
}
return false
}, 20*time.Second, time.Second)

require.Eventually(t, func() bool {
for _, log := range logs.All() {
// Confirm collector starts and runs successfully
if strings.Contains(log.Message, "Everything is ready. Begin running and processing data.") {
return true
}
}
return false
}, 20*time.Second, time.Second)

expectedConfig := map[string]any{
"receivers": map[string]any{
"hostmetrics": map[string]any{
"collection_interval": "10s",
"scrapers": map[string]any{
"cpu": nil,
"disk": nil,
"filesystem": nil,
"memory": nil,
"network": nil,
},
},
},
"processors": map[string]any{
"resourcedetection": map[string]any{
"detectors": []any{"system"},
},
},
"exporters": map[string]any{
"otlp": map[string]any{
"endpoint": "localhost:23456",
"tls": map[string]any{
"insecure": true,
},
},
},
"service": map[string]any{
"pipelines": map[string]any{
"metrics": map[string]any{
"processors": []any{"resourcedetection"},
"receivers": []any{"hostmetrics"},
"exporters": []any{"otlp"},
},
},
},
}
for _, tc := range []struct {
expected map[string]any
endpoint string
}{
{expected: expectedConfig, endpoint: "effective"},
} {
resp, err := http.Get(fmt.Sprintf("http://localhost:55554/debug/configz/%s", tc.endpoint))
require.NoError(t, err)

body, err := io.ReadAll(resp.Body)
require.NoError(t, err)

actual := map[string]any{}
require.NoError(t, yaml.Unmarshal(body, &actual))

require.Equal(t, tc.expected, confmap.NewFromStringMap(actual).ToStringMap())
}

require.NoError(t, collector.Shutdown())
}
1 change: 1 addition & 0 deletions tests/general/testdata/collection_interval_scalar
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
10s
3 changes: 3 additions & 0 deletions tests/general/testdata/exporter_component
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
endpoint: localhost:23456
tls:
insecure: true
5 changes: 5 additions & 0 deletions tests/general/testdata/service_template_component
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pipelines:
metrics:
receivers: {{ .my_receivers }}
processors: {{ .my_processors }}
exporters: {{ .my_exporters }}
22 changes: 22 additions & 0 deletions tests/general/testdata/templated.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
config_sources:
include:

receivers:
hostmetrics:
scrapers:
filesystem:
memory:
network:
cpu:
disk:
collection_interval: ${include:./testdata/collection_interval_scalar}
processors:
resourcedetection:
detectors: [ system ]
exporters:
otlp: ${include:./testdata/exporter_component}
service: |
$include: ./testdata/service_template_component
my_receivers: [ hostmetrics ]
my_processors: [ resourcedetection ]
my_exporters: [ otlp ]
1 change: 1 addition & 0 deletions tests/testutils/subprocess/subprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func (subprocess *Subprocess) Shutdown(ctx context.Context) error {
if subprocess.cancel == nil {
return fmt.Errorf("no subprocess.cancel(). Has it been started properly?")
}
subprocess.cancel()

timeout := defaultShutdownTimeout
if subprocess.config.ShutdownTimeout != nil {
Expand Down

0 comments on commit 070dbe7

Please sign in to comment.