From fa688924762b489a5d233b609d73ddd2282d9a75 Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Tue, 1 Jun 2021 14:20:40 -0400 Subject: [PATCH] Cleanup file input 2 (#165) * Split file_test.go into multiple files * Rename utils_test.go to util_test.go * Move TestBuild into config_test.go, and all rotation tests into rotation_test.go --- operator/builtin/input/file/benchmark_test.go | 84 ++ operator/builtin/input/file/config_test.go | 154 +++- operator/builtin/input/file/file_test.go | 788 ------------------ operator/builtin/input/file/rotation_test.go | 450 ++++++++++ operator/builtin/input/file/util_test.go | 190 +++++ 5 files changed, 877 insertions(+), 789 deletions(-) create mode 100644 operator/builtin/input/file/benchmark_test.go create mode 100644 operator/builtin/input/file/rotation_test.go create mode 100644 operator/builtin/input/file/util_test.go diff --git a/operator/builtin/input/file/benchmark_test.go b/operator/builtin/input/file/benchmark_test.go new file mode 100644 index 00000000..f579d17b --- /dev/null +++ b/operator/builtin/input/file/benchmark_test.go @@ -0,0 +1,84 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package file + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-log-collection/operator" + "github.com/open-telemetry/opentelemetry-log-collection/testutil" +) + +type fileInputBenchmark struct { + name string + config *InputConfig +} + +func BenchmarkFileInput(b *testing.B) { + cases := []fileInputBenchmark{ + { + "Default", + NewInputConfig("test_id"), + }, + { + "NoFileName", + func() *InputConfig { + cfg := NewInputConfig("test_id") + cfg.IncludeFileName = false + return cfg + }(), + }, + } + + for _, tc := range cases { + b.Run(tc.name, func(b *testing.B) { + tempDir := testutil.NewTempDir(b) + path := filepath.Join(tempDir, "in.log") + + cfg := tc.config + cfg.OutputIDs = []string{"fake"} + cfg.Include = []string{path} + cfg.StartAt = "beginning" + + ops, err := cfg.Build(testutil.NewBuildContext(b)) + require.NoError(b, err) + op := ops[0] + + fakeOutput := testutil.NewFakeOutput(b) + err = op.SetOutputs([]operator.Operator{fakeOutput}) + require.NoError(b, err) + + err = op.Start(testutil.NewMockPersister("test")) + defer op.Stop() + require.NoError(b, err) + + file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666) + require.NoError(b, err) + + for i := 0; i < b.N; i++ { + file.WriteString("testlog\n") + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + <-fakeOutput.Received + } + }) + } +} diff --git a/operator/builtin/input/file/config_test.go b/operator/builtin/input/file/config_test.go index 172d6e49..a9da65f2 100644 --- a/operator/builtin/input/file/config_test.go +++ b/operator/builtin/input/file/config_test.go @@ -22,11 +22,13 @@ import ( "github.com/stretchr/testify/require" "github.com/open-telemetry/opentelemetry-log-collection/entry" + "github.com/open-telemetry/opentelemetry-log-collection/operator" "github.com/open-telemetry/opentelemetry-log-collection/operator/helper" "github.com/open-telemetry/opentelemetry-log-collection/operator/helper/operatortest" + "github.com/open-telemetry/opentelemetry-log-collection/testutil" ) -func TestConfig(t *testing.T) { +func TestUnmarshal(t *testing.T) { cases := []operatortest.ConfigUnmarshalTest{ { Name: "default", @@ -512,6 +514,156 @@ func TestConfig(t *testing.T) { } } +func TestBuild(t *testing.T) { + t.Parallel() + fakeOutput := testutil.NewMockOperator("$.fake") + + basicConfig := func() *InputConfig { + cfg := NewInputConfig("testfile") + cfg.OutputIDs = []string{"fake"} + cfg.Include = []string{"/var/log/testpath.*"} + cfg.Exclude = []string{"/var/log/testpath.ex*"} + cfg.PollInterval = helper.Duration{Duration: 10 * time.Millisecond} + return cfg + } + + cases := []struct { + name string + modifyBaseConfig func(*InputConfig) + errorRequirement require.ErrorAssertionFunc + validate func(*testing.T, *InputOperator) + }{ + { + "Basic", + func(f *InputConfig) {}, + require.NoError, + func(t *testing.T, f *InputOperator) { + require.Equal(t, f.OutputOperators[0], fakeOutput) + require.Equal(t, f.Include, []string{"/var/log/testpath.*"}) + require.Equal(t, f.FilePathField, entry.NewNilField()) + require.Equal(t, f.FileNameField, entry.NewAttributeField("file_name")) + require.Equal(t, f.PollInterval, 10*time.Millisecond) + }, + }, + { + "BadIncludeGlob", + func(f *InputConfig) { + f.Include = []string{"["} + }, + require.Error, + nil, + }, + { + "BadExcludeGlob", + func(f *InputConfig) { + f.Include = []string{"["} + }, + require.Error, + nil, + }, + { + "MultilineConfiguredStartAndEndPatterns", + func(f *InputConfig) { + f.Multiline = helper.MultilineConfig{ + LineEndPattern: "Exists", + LineStartPattern: "Exists", + } + }, + require.Error, + nil, + }, + { + "MultilineConfiguredStartPattern", + func(f *InputConfig) { + f.Multiline = helper.MultilineConfig{ + LineStartPattern: "START.*", + } + }, + require.NoError, + func(t *testing.T, f *InputOperator) {}, + }, + { + "MultilineConfiguredEndPattern", + func(f *InputConfig) { + f.Multiline = helper.MultilineConfig{ + LineEndPattern: "END.*", + } + }, + require.NoError, + func(t *testing.T, f *InputOperator) {}, + }, + { + "InvalidEncoding", + func(f *InputConfig) { + f.Encoding = helper.EncodingConfig{Encoding: "UTF-3233"} + }, + require.Error, + nil, + }, + { + "LineStartAndEnd", + func(f *InputConfig) { + f.Multiline = helper.MultilineConfig{ + LineStartPattern: ".*", + LineEndPattern: ".*", + } + }, + require.Error, + nil, + }, + { + "NoLineStartOrEnd", + func(f *InputConfig) { + f.Multiline = helper.MultilineConfig{} + }, + require.NoError, + func(t *testing.T, f *InputOperator) {}, + }, + { + "InvalidLineStartRegex", + func(f *InputConfig) { + f.Multiline = helper.MultilineConfig{ + LineStartPattern: "(", + } + }, + require.Error, + nil, + }, + { + "InvalidLineEndRegex", + func(f *InputConfig) { + f.Multiline = helper.MultilineConfig{ + LineEndPattern: "(", + } + }, + require.Error, + nil, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + tc := tc + t.Parallel() + cfg := basicConfig() + tc.modifyBaseConfig(cfg) + + ops, err := cfg.Build(testutil.NewBuildContext(t)) + tc.errorRequirement(t, err) + if err != nil { + return + } + op := ops[0] + + err = op.SetOutputs([]operator.Operator{fakeOutput}) + require.NoError(t, err) + + fileInput := op.(*InputOperator) + tc.validate(t, fileInput) + }) + } +} + func defaultCfg() *InputConfig { return NewInputConfig("file_input") } diff --git a/operator/builtin/input/file/file_test.go b/operator/builtin/input/file/file_test.go index 46583f2b..663876b8 100644 --- a/operator/builtin/input/file/file_test.go +++ b/operator/builtin/input/file/file_test.go @@ -17,253 +17,20 @@ package file import ( "context" "fmt" - "io" - "io/ioutil" - "log" - "math/rand" "os" "path/filepath" - "runtime" "strconv" "sync" "testing" "time" - "github.com/observiq/nanojack" "github.com/stretchr/testify/require" "github.com/open-telemetry/opentelemetry-log-collection/entry" - "github.com/open-telemetry/opentelemetry-log-collection/operator" "github.com/open-telemetry/opentelemetry-log-collection/operator/helper" "github.com/open-telemetry/opentelemetry-log-collection/testutil" ) -func newDefaultConfig(tempDir string) *InputConfig { - cfg := NewInputConfig("testfile") - cfg.PollInterval = helper.Duration{Duration: 50 * time.Millisecond} - cfg.StartAt = "beginning" - cfg.Include = []string{fmt.Sprintf("%s/*", tempDir)} - cfg.OutputIDs = []string{"fake"} - return cfg -} - -func newTestFileOperator(t *testing.T, cfgMod func(*InputConfig), outMod func(*testutil.FakeOutput)) (*InputOperator, chan *entry.Entry, string) { - fakeOutput := testutil.NewFakeOutput(t) - if outMod != nil { - outMod(fakeOutput) - } - - tempDir := testutil.NewTempDir(t) - - cfg := newDefaultConfig(tempDir) - if cfgMod != nil { - cfgMod(cfg) - } - ops, err := cfg.Build(testutil.NewBuildContext(t)) - require.NoError(t, err) - op := ops[0] - - err = op.SetOutputs([]operator.Operator{fakeOutput}) - require.NoError(t, err) - - return op.(*InputOperator), fakeOutput.Received, tempDir -} - -func openFile(t testing.TB, path string) *os.File { - file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0777) - require.NoError(t, err) - t.Cleanup(func() { _ = file.Close() }) - return file -} - -func openTemp(t testing.TB, tempDir string) *os.File { - return openTempWithPattern(t, tempDir, "") -} - -func reopenTemp(t testing.TB, name string) *os.File { - return openTempWithPattern(t, filepath.Dir(name), filepath.Base(name)) -} - -func openTempWithPattern(t testing.TB, tempDir, pattern string) *os.File { - file, err := ioutil.TempFile(tempDir, pattern) - require.NoError(t, err) - t.Cleanup(func() { _ = file.Close() }) - return file -} - -func getRotatingLogger(t testing.TB, tempDir string, maxLines, maxBackups int, copyTruncate, sequential bool) *log.Logger { - file, err := ioutil.TempFile(tempDir, "") - require.NoError(t, err) - require.NoError(t, file.Close()) // will be managed by rotator - - rotator := nanojack.Logger{ - Filename: file.Name(), - MaxLines: maxLines, - MaxBackups: maxBackups, - CopyTruncate: copyTruncate, - Sequential: sequential, - } - - t.Cleanup(func() { _ = rotator.Close() }) - - return log.New(&rotator, "", 0) -} - -func writeString(t testing.TB, file *os.File, s string) { - _, err := file.WriteString(s) - require.NoError(t, err) -} - -func TestBuild(t *testing.T) { - t.Parallel() - fakeOutput := testutil.NewMockOperator("$.fake") - - basicConfig := func() *InputConfig { - cfg := NewInputConfig("testfile") - cfg.OutputIDs = []string{"fake"} - cfg.Include = []string{"/var/log/testpath.*"} - cfg.Exclude = []string{"/var/log/testpath.ex*"} - cfg.PollInterval = helper.Duration{Duration: 10 * time.Millisecond} - return cfg - } - - cases := []struct { - name string - modifyBaseConfig func(*InputConfig) - errorRequirement require.ErrorAssertionFunc - validate func(*testing.T, *InputOperator) - }{ - { - "Basic", - func(f *InputConfig) {}, - require.NoError, - func(t *testing.T, f *InputOperator) { - require.Equal(t, f.OutputOperators[0], fakeOutput) - require.Equal(t, f.Include, []string{"/var/log/testpath.*"}) - require.Equal(t, f.FilePathField, entry.NewNilField()) - require.Equal(t, f.FileNameField, entry.NewAttributeField("file_name")) - require.Equal(t, f.PollInterval, 10*time.Millisecond) - }, - }, - { - "BadIncludeGlob", - func(f *InputConfig) { - f.Include = []string{"["} - }, - require.Error, - nil, - }, - { - "BadExcludeGlob", - func(f *InputConfig) { - f.Include = []string{"["} - }, - require.Error, - nil, - }, - { - "MultilineConfiguredStartAndEndPatterns", - func(f *InputConfig) { - f.Multiline = helper.MultilineConfig{ - LineEndPattern: "Exists", - LineStartPattern: "Exists", - } - }, - require.Error, - nil, - }, - { - "MultilineConfiguredStartPattern", - func(f *InputConfig) { - f.Multiline = helper.MultilineConfig{ - LineStartPattern: "START.*", - } - }, - require.NoError, - func(t *testing.T, f *InputOperator) {}, - }, - { - "MultilineConfiguredEndPattern", - func(f *InputConfig) { - f.Multiline = helper.MultilineConfig{ - LineEndPattern: "END.*", - } - }, - require.NoError, - func(t *testing.T, f *InputOperator) {}, - }, - { - "InvalidEncoding", - func(f *InputConfig) { - f.Encoding = helper.EncodingConfig{Encoding: "UTF-3233"} - }, - require.Error, - nil, - }, - { - "LineStartAndEnd", - func(f *InputConfig) { - f.Multiline = helper.MultilineConfig{ - LineStartPattern: ".*", - LineEndPattern: ".*", - } - }, - require.Error, - nil, - }, - { - "NoLineStartOrEnd", - func(f *InputConfig) { - f.Multiline = helper.MultilineConfig{} - }, - require.NoError, - func(t *testing.T, f *InputOperator) {}, - }, - { - "InvalidLineStartRegex", - func(f *InputConfig) { - f.Multiline = helper.MultilineConfig{ - LineStartPattern: "(", - } - }, - require.Error, - nil, - }, - { - "InvalidLineEndRegex", - func(f *InputConfig) { - f.Multiline = helper.MultilineConfig{ - LineEndPattern: "(", - } - }, - require.Error, - nil, - }, - } - - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - tc := tc - t.Parallel() - cfg := basicConfig() - tc.modifyBaseConfig(cfg) - - ops, err := cfg.Build(testutil.NewBuildContext(t)) - tc.errorRequirement(t, err) - if err != nil { - return - } - op := ops[0] - - err = op.SetOutputs([]operator.Operator{fakeOutput}) - require.NoError(t, err) - - fileInput := op.(*InputOperator) - tc.validate(t, fileInput) - }) - } -} - func TestCleanStop(t *testing.T) { t.Parallel() t.Skip(`Skipping due to goroutine leak in opencensus. @@ -556,391 +323,6 @@ func TestMultiFileParallel_LiveFiles(t *testing.T) { wg.Wait() } -func TestMultiFileRotate(t *testing.T) { - t.Parallel() - - getMessage := func(f, k, m int) string { return fmt.Sprintf("file %d-%d, message %d", f, k, m) } - - operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) - - numFiles := 3 - numMessages := 3 - numRotations := 3 - - expected := make([]string, 0, numFiles*numMessages*numRotations) - for i := 0; i < numFiles; i++ { - for j := 0; j < numMessages; j++ { - for k := 0; k < numRotations; k++ { - expected = append(expected, getMessage(i, k, j)) - } - } - } - - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) - defer operator.Stop() - - temps := make([]*os.File, 0, numFiles) - for i := 0; i < numFiles; i++ { - temps = append(temps, openTemp(t, tempDir)) - } - - var wg sync.WaitGroup - for i, temp := range temps { - wg.Add(1) - go func(tf *os.File, f int) { - defer wg.Done() - for k := 0; k < numRotations; k++ { - for j := 0; j < numMessages; j++ { - writeString(t, tf, getMessage(f, k, j)+"\n") - } - - require.NoError(t, tf.Close()) - require.NoError(t, os.Rename(tf.Name(), fmt.Sprintf("%s.%d", tf.Name(), k))) - tf = reopenTemp(t, tf.Name()) - } - }(temp, i) - } - - waitForMessages(t, logReceived, expected) - wg.Wait() -} - -func TestMultiFileRotateSlow(t *testing.T) { - if runtime.GOOS == "windows" { - // Windows has very poor support for moving active files, so rotation is less commonly used - // This may possibly be handled better in Go 1.16: https://github.com/golang/go/issues/35358 - t.Skip() - } - - t.Parallel() - - operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) - - getMessage := func(f, k, m int) string { return fmt.Sprintf("file %d-%d, message %d", f, k, m) } - fileName := func(f, k int) string { return filepath.Join(tempDir, fmt.Sprintf("file%d.rot%d.log", f, k)) } - baseFileName := func(f int) string { return filepath.Join(tempDir, fmt.Sprintf("file%d.log", f)) } - - numFiles := 3 - numMessages := 30 - numRotations := 3 - - expected := make([]string, 0, numFiles*numMessages*numRotations) - for i := 0; i < numFiles; i++ { - for j := 0; j < numMessages; j++ { - for k := 0; k < numRotations; k++ { - expected = append(expected, getMessage(i, k, j)) - } - } - } - - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) - defer operator.Stop() - - var wg sync.WaitGroup - for fileNum := 0; fileNum < numFiles; fileNum++ { - wg.Add(1) - go func(fn int) { - defer wg.Done() - - for rotationNum := 0; rotationNum < numRotations; rotationNum++ { - file := openFile(t, baseFileName(fn)) - for messageNum := 0; messageNum < numMessages; messageNum++ { - writeString(t, file, getMessage(fn, rotationNum, messageNum)+"\n") - time.Sleep(5 * time.Millisecond) - } - - require.NoError(t, file.Close()) - require.NoError(t, os.Rename(baseFileName(fn), fileName(fn, rotationNum))) - } - }(fileNum) - } - - waitForMessages(t, logReceived, expected) - wg.Wait() -} - -func TestMultiCopyTruncateSlow(t *testing.T) { - operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) - - getMessage := func(f, k, m int) string { return fmt.Sprintf("file %d-%d, message %d", f, k, m) } - fileName := func(f, k int) string { return filepath.Join(tempDir, fmt.Sprintf("file%d.rot%d.log", f, k)) } - baseFileName := func(f int) string { return filepath.Join(tempDir, fmt.Sprintf("file%d.log", f)) } - - numFiles := 3 - numMessages := 30 - numRotations := 3 - - expected := make([]string, 0, numFiles*numMessages*numRotations) - for i := 0; i < numFiles; i++ { - for j := 0; j < numMessages; j++ { - for k := 0; k < numRotations; k++ { - expected = append(expected, getMessage(i, k, j)) - } - } - } - - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) - defer operator.Stop() - - var wg sync.WaitGroup - for fileNum := 0; fileNum < numFiles; fileNum++ { - wg.Add(1) - go func(fn int) { - defer wg.Done() - - for rotationNum := 0; rotationNum < numRotations; rotationNum++ { - file := openFile(t, baseFileName(fn)) - for messageNum := 0; messageNum < numMessages; messageNum++ { - writeString(t, file, getMessage(fn, rotationNum, messageNum)+"\n") - time.Sleep(5 * time.Millisecond) - } - - _, err := file.Seek(0, 0) - require.NoError(t, err) - dst := openFile(t, fileName(fn, rotationNum)) - _, err = io.Copy(dst, file) - require.NoError(t, err) - require.NoError(t, dst.Close()) - require.NoError(t, file.Truncate(0)) - _, err = file.Seek(0, 0) - require.NoError(t, err) - file.Close() - } - }(fileNum) - } - - waitForMessages(t, logReceived, expected) - wg.Wait() -} - -type rotationTest struct { - name string - totalLines int - maxLinesPerFile int - maxBackupFiles int - writeInterval time.Duration - pollInterval time.Duration - ephemeralLines bool -} - -/* - When log files are rotated at extreme speeds, it is possible to miss some log entries. - This can happen when an individual log entry is written and deleted within the duration - of a single poll interval. For example, consider the following scenario: - - A log file may have up to 9 backups (10 total log files) - - Each log file may contain up to 10 entries - - Log entries are written at an interval of 10µs - - Log files are polled at an interval of 100ms - In this scenario, a log entry that is written may only exist on disk for about 1ms. - A polling interval of 100ms will most likely never produce a chance to read the log file. - - In production settings, this consideration is not very likely to be a problem, but it is - easy to encounter the issue in tests, and difficult to deterministically simulate edge cases. - However, the above understanding does allow for some consistent expectations. - 1) Cases that do not require deletion of old log entries should always pass. - 2) Cases where the polling interval is sufficiently rapid should always pass. - 3) When neither 1 nor 2 is true, there may be missing entries, but still no duplicates. - - The following method is provided largely as documentation of how this is expected to behave. - In practice, timing is largely dependent on the responsiveness of system calls. -*/ -func (rt rotationTest) expectEphemeralLines() bool { - // primary + backups - maxLinesInAllFiles := rt.maxLinesPerFile + rt.maxLinesPerFile*rt.maxBackupFiles - - // Will the test write enough lines to result in deletion of oldest backups? - maxBackupsExceeded := rt.totalLines > maxLinesInAllFiles - - // last line written in primary file will exist for l*b more writes - minTimeToLive := time.Duration(int(rt.writeInterval) * rt.maxLinesPerFile * rt.maxBackupFiles) - - // can a line be written and then rotated to deletion before ever observed? - return maxBackupsExceeded && rt.pollInterval > minTimeToLive -} - -func (rt rotationTest) run(tc rotationTest, copyTruncate, sequential bool) func(t *testing.T) { - return func(t *testing.T) { - operator, logReceived, tempDir := newTestFileOperator(t, - func(cfg *InputConfig) { - cfg.PollInterval = helper.NewDuration(tc.pollInterval) - }, - func(out *testutil.FakeOutput) { - out.Received = make(chan *entry.Entry, tc.totalLines) - }, - ) - logger := getRotatingLogger(t, tempDir, tc.maxLinesPerFile, tc.maxBackupFiles, copyTruncate, sequential) - - expected := make([]string, 0, tc.totalLines) - baseStr := stringWithLength(46) // + ' 123' - for i := 0; i < tc.totalLines; i++ { - expected = append(expected, fmt.Sprintf("%s %3d", baseStr, i)) - } - - require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) - defer operator.Stop() - - for _, message := range expected { - logger.Println(message) - time.Sleep(tc.writeInterval) - } - - received := make([]string, 0, tc.totalLines) - LOOP: - for { - select { - case e := <-logReceived: - received = append(received, e.Body.(string)) - case <-time.After(200 * time.Millisecond): - break LOOP - } - } - - if tc.ephemeralLines { - if !tc.expectEphemeralLines() { - // This is helpful for test development, and ensures the sample computation is used - t.Logf("Potentially unstable ephemerality expectation for test: %s", tc.name) - } - require.Subset(t, expected, received) - } else { - require.ElementsMatch(t, expected, received) - } - } -} - -func TestRotation(t *testing.T) { - cases := []rotationTest{ - { - name: "NoRotation", - totalLines: 10, - maxLinesPerFile: 10, - maxBackupFiles: 1, - writeInterval: time.Millisecond, - pollInterval: 10 * time.Millisecond, - }, - { - name: "NoDeletion", - totalLines: 20, - maxLinesPerFile: 10, - maxBackupFiles: 1, - writeInterval: time.Millisecond, - pollInterval: 10 * time.Millisecond, - }, - { - name: "Deletion", - totalLines: 30, - maxLinesPerFile: 10, - maxBackupFiles: 1, - writeInterval: time.Millisecond, - pollInterval: 10 * time.Millisecond, - ephemeralLines: true, - }, - { - name: "Deletion/ExceedFingerprint", - totalLines: 300, - maxLinesPerFile: 100, - maxBackupFiles: 1, - writeInterval: time.Millisecond, - pollInterval: 10 * time.Millisecond, - ephemeralLines: true, - }, - } - - for _, tc := range cases { - t.Run(fmt.Sprintf("%s/MoveCreateTimestamped", tc.name), tc.run(tc, false, false)) - t.Run(fmt.Sprintf("%s/MoveCreateSequential", tc.name), tc.run(tc, false, true)) - t.Run(fmt.Sprintf("%s/CopyTruncateTimestamped", tc.name), tc.run(tc, true, false)) - t.Run(fmt.Sprintf("%s/CopyTruncateSequential", tc.name), tc.run(tc, true, true)) - } -} - -func TestMoveFile(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("Moving files while open is unsupported on Windows") - } - t.Parallel() - operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) - operator.persister = testutil.NewMockPersister("test") - - temp1 := openTemp(t, tempDir) - writeString(t, temp1, "testlog1\n") - temp1.Close() - - operator.poll(context.Background()) - defer operator.Stop() - - waitForMessage(t, logReceived, "testlog1") - - // Wait until all goroutines are finished before renaming - operator.wg.Wait() - err := os.Rename(temp1.Name(), fmt.Sprintf("%s.2", temp1.Name())) - require.NoError(t, err) - - operator.poll(context.Background()) - expectNoMessages(t, logReceived) -} - -// TruncateThenWrite tests that, after a file has been truncated, -// any new writes are picked up -func TestTruncateThenWrite(t *testing.T) { - t.Parallel() - operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) - operator.persister = testutil.NewMockPersister("test") - - temp1 := openTemp(t, tempDir) - writeString(t, temp1, "testlog1\ntestlog2\n") - - operator.poll(context.Background()) - defer operator.Stop() - - waitForMessage(t, logReceived, "testlog1") - waitForMessage(t, logReceived, "testlog2") - - require.NoError(t, temp1.Truncate(0)) - temp1.Seek(0, 0) - - writeString(t, temp1, "testlog3\n") - operator.poll(context.Background()) - waitForMessage(t, logReceived, "testlog3") - expectNoMessages(t, logReceived) -} - -// CopyTruncateWriteBoth tests that when a file is copied -// with unread logs on the end, then the original is truncated, -// we get the unread logs on the copy as well as any new logs -// written to the truncated file -func TestCopyTruncateWriteBoth(t *testing.T) { - t.Parallel() - operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) - operator.persister = testutil.NewMockPersister("test") - - temp1 := openTemp(t, tempDir) - writeString(t, temp1, "testlog1\ntestlog2\n") - - operator.poll(context.Background()) - defer operator.Stop() - - waitForMessage(t, logReceived, "testlog1") - waitForMessage(t, logReceived, "testlog2") - operator.wg.Wait() // wait for all goroutines to finish - - // Copy the first file to a new file, and add another log - temp2 := openTemp(t, tempDir) - _, err := io.Copy(temp2, temp1) - require.NoError(t, err) - - // Truncate original file - require.NoError(t, temp1.Truncate(0)) - temp1.Seek(0, 0) - - // Write to original and new file - writeString(t, temp2, "testlog3\n") - writeString(t, temp1, "testlog4\n") - - // Expect both messages to come through - operator.poll(context.Background()) - waitForMessages(t, logReceived, []string{"testlog3", "testlog4"}) -} - // OffsetsAfterRestart tests that a operator is able to load // its offsets after a restart func TestOffsetsAfterRestart(t *testing.T) { @@ -1015,38 +397,6 @@ func TestOffsetsAfterRestart_BigFilesWrittenWhileOff(t *testing.T) { waitForMessage(t, logReceived, log2) } -func TestFileMovedWhileOff_BigFiles(t *testing.T) { - t.Parallel() - operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) - persister := testutil.NewMockPersister("test") - - log1 := stringWithLength(1000) - log2 := stringWithLength(1000) - - temp := openTemp(t, tempDir) - writeString(t, temp, log1+"\n") - require.NoError(t, temp.Close()) - - // Start the operator - require.NoError(t, operator.Start(persister)) - defer operator.Stop() - waitForMessage(t, logReceived, log1) - - // Stop the operator, then rename and write a new log - require.NoError(t, operator.Stop()) - - err := os.Rename(temp.Name(), fmt.Sprintf("%s2", temp.Name())) - require.NoError(t, err) - - temp = reopenTemp(t, temp.Name()) - require.NoError(t, err) - writeString(t, temp, log2+"\n") - - // Expect the message written to the new log to come through - require.NoError(t, operator.Start(persister)) - waitForMessage(t, logReceived, log2) -} - func TestManyLogsDelivered(t *testing.T) { t.Parallel() operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) @@ -1157,75 +507,6 @@ func TestFileReader_FingerprintUpdated(t *testing.T) { require.Equal(t, []byte("testlog1\n"), reader.Fingerprint.FirstBytes) } -func stringWithLength(length int) string { - charset := "abcdefghijklmnopqrstuvwxyz" - b := make([]byte, length) - for i := range b { - b[i] = charset[rand.Intn(len(charset))] - } - return string(b) -} - -func waitForOne(t *testing.T, c chan *entry.Entry) *entry.Entry { - select { - case e := <-c: - return e - case <-time.After(time.Second): - require.FailNow(t, "Timed out waiting for message") - return nil - } -} - -func waitForN(t *testing.T, c chan *entry.Entry, n int) []string { - messages := make([]string, 0, n) - for i := 0; i < n; i++ { - select { - case e := <-c: - messages = append(messages, e.Body.(string)) - case <-time.After(time.Second): - require.FailNow(t, "Timed out waiting for message") - return nil - } - } - return messages -} - -func waitForMessage(t *testing.T, c chan *entry.Entry, expected string) { - select { - case e := <-c: - require.Equal(t, expected, e.Body.(string)) - case <-time.After(time.Second): - require.FailNow(t, "Timed out waiting for message", expected) - } -} - -func waitForMessages(t *testing.T, c chan *entry.Entry, expected []string) { - receivedMessages := make([]string, 0, len(expected)) -LOOP: - for { - select { - case e := <-c: - receivedMessages = append(receivedMessages, e.Body.(string)) - case <-time.After(time.Second): - break LOOP - } - } - - require.ElementsMatch(t, expected, receivedMessages) -} - -func expectNoMessages(t *testing.T, c chan *entry.Entry) { - expectNoMessagesUntil(t, c, 200*time.Millisecond) -} - -func expectNoMessagesUntil(t *testing.T, c chan *entry.Entry, d time.Duration) { - select { - case e := <-c: - require.FailNow(t, "Received unexpected message", "Message: %s", e.Body.(string)) - case <-time.After(d): - } -} - func TestEncodings(t *testing.T) { t.Parallel() cases := []struct { @@ -1311,64 +592,6 @@ func TestEncodings(t *testing.T) { } } -type fileInputBenchmark struct { - name string - config *InputConfig -} - -func BenchmarkFileInput(b *testing.B) { - cases := []fileInputBenchmark{ - { - "Default", - NewInputConfig("test_id"), - }, - { - "NoFileName", - func() *InputConfig { - cfg := NewInputConfig("test_id") - cfg.IncludeFileName = false - return cfg - }(), - }, - } - - for _, tc := range cases { - b.Run(tc.name, func(b *testing.B) { - tempDir := testutil.NewTempDir(b) - path := filepath.Join(tempDir, "in.log") - - cfg := tc.config - cfg.OutputIDs = []string{"fake"} - cfg.Include = []string{path} - cfg.StartAt = "beginning" - - ops, err := cfg.Build(testutil.NewBuildContext(b)) - require.NoError(b, err) - op := ops[0] - - fakeOutput := testutil.NewFakeOutput(b) - err = op.SetOutputs([]operator.Operator{fakeOutput}) - require.NoError(b, err) - - err = op.Start(testutil.NewMockPersister("test")) - defer op.Stop() - require.NoError(b, err) - - file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666) - require.NoError(b, err) - - for i := 0; i < b.N; i++ { - file.WriteString("testlog\n") - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - <-fakeOutput.Received - } - }) - } -} - // TestExclude tests that a log file will be excluded if it matches the // glob specified in the operator. func TestExclude(t *testing.T) { @@ -1411,14 +634,3 @@ func TestExcludeDuplicates(t *testing.T) { matches := getMatches(includes, excludes) require.ElementsMatch(t, matches, paths[2:3]) } - -// writes file with the specified file names and returns their full paths in order -func writeTempFiles(tempDir string, names []string) []string { - result := make([]string, 0, len(names)) - for _, name := range names { - path := filepath.Join(tempDir, name) - ioutil.WriteFile(path, []byte(name), 0755) - result = append(result, path) - } - return result -} diff --git a/operator/builtin/input/file/rotation_test.go b/operator/builtin/input/file/rotation_test.go new file mode 100644 index 00000000..72a43b28 --- /dev/null +++ b/operator/builtin/input/file/rotation_test.go @@ -0,0 +1,450 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package file + +import ( + "context" + "fmt" + "io" + "os" + "path/filepath" + "runtime" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-log-collection/entry" + "github.com/open-telemetry/opentelemetry-log-collection/operator/helper" + "github.com/open-telemetry/opentelemetry-log-collection/testutil" +) + +func TestMultiFileRotate(t *testing.T) { + t.Parallel() + + getMessage := func(f, k, m int) string { return fmt.Sprintf("file %d-%d, message %d", f, k, m) } + + operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) + + numFiles := 3 + numMessages := 3 + numRotations := 3 + + expected := make([]string, 0, numFiles*numMessages*numRotations) + for i := 0; i < numFiles; i++ { + for j := 0; j < numMessages; j++ { + for k := 0; k < numRotations; k++ { + expected = append(expected, getMessage(i, k, j)) + } + } + } + + require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + defer operator.Stop() + + temps := make([]*os.File, 0, numFiles) + for i := 0; i < numFiles; i++ { + temps = append(temps, openTemp(t, tempDir)) + } + + var wg sync.WaitGroup + for i, temp := range temps { + wg.Add(1) + go func(tf *os.File, f int) { + defer wg.Done() + for k := 0; k < numRotations; k++ { + for j := 0; j < numMessages; j++ { + writeString(t, tf, getMessage(f, k, j)+"\n") + } + + require.NoError(t, tf.Close()) + require.NoError(t, os.Rename(tf.Name(), fmt.Sprintf("%s.%d", tf.Name(), k))) + tf = reopenTemp(t, tf.Name()) + } + }(temp, i) + } + + waitForMessages(t, logReceived, expected) + wg.Wait() +} + +func TestMultiFileRotateSlow(t *testing.T) { + if runtime.GOOS == "windows" { + // Windows has very poor support for moving active files, so rotation is less commonly used + // This may possibly be handled better in Go 1.16: https://github.com/golang/go/issues/35358 + t.Skip() + } + + t.Parallel() + + operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) + + getMessage := func(f, k, m int) string { return fmt.Sprintf("file %d-%d, message %d", f, k, m) } + fileName := func(f, k int) string { return filepath.Join(tempDir, fmt.Sprintf("file%d.rot%d.log", f, k)) } + baseFileName := func(f int) string { return filepath.Join(tempDir, fmt.Sprintf("file%d.log", f)) } + + numFiles := 3 + numMessages := 30 + numRotations := 3 + + expected := make([]string, 0, numFiles*numMessages*numRotations) + for i := 0; i < numFiles; i++ { + for j := 0; j < numMessages; j++ { + for k := 0; k < numRotations; k++ { + expected = append(expected, getMessage(i, k, j)) + } + } + } + + require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + defer operator.Stop() + + var wg sync.WaitGroup + for fileNum := 0; fileNum < numFiles; fileNum++ { + wg.Add(1) + go func(fn int) { + defer wg.Done() + + for rotationNum := 0; rotationNum < numRotations; rotationNum++ { + file := openFile(t, baseFileName(fn)) + for messageNum := 0; messageNum < numMessages; messageNum++ { + writeString(t, file, getMessage(fn, rotationNum, messageNum)+"\n") + time.Sleep(5 * time.Millisecond) + } + + require.NoError(t, file.Close()) + require.NoError(t, os.Rename(baseFileName(fn), fileName(fn, rotationNum))) + } + }(fileNum) + } + + waitForMessages(t, logReceived, expected) + wg.Wait() +} + +func TestMultiCopyTruncateSlow(t *testing.T) { + operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) + + getMessage := func(f, k, m int) string { return fmt.Sprintf("file %d-%d, message %d", f, k, m) } + fileName := func(f, k int) string { return filepath.Join(tempDir, fmt.Sprintf("file%d.rot%d.log", f, k)) } + baseFileName := func(f int) string { return filepath.Join(tempDir, fmt.Sprintf("file%d.log", f)) } + + numFiles := 3 + numMessages := 30 + numRotations := 3 + + expected := make([]string, 0, numFiles*numMessages*numRotations) + for i := 0; i < numFiles; i++ { + for j := 0; j < numMessages; j++ { + for k := 0; k < numRotations; k++ { + expected = append(expected, getMessage(i, k, j)) + } + } + } + + require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + defer operator.Stop() + + var wg sync.WaitGroup + for fileNum := 0; fileNum < numFiles; fileNum++ { + wg.Add(1) + go func(fn int) { + defer wg.Done() + + for rotationNum := 0; rotationNum < numRotations; rotationNum++ { + file := openFile(t, baseFileName(fn)) + for messageNum := 0; messageNum < numMessages; messageNum++ { + writeString(t, file, getMessage(fn, rotationNum, messageNum)+"\n") + time.Sleep(5 * time.Millisecond) + } + + _, err := file.Seek(0, 0) + require.NoError(t, err) + dst := openFile(t, fileName(fn, rotationNum)) + _, err = io.Copy(dst, file) + require.NoError(t, err) + require.NoError(t, dst.Close()) + require.NoError(t, file.Truncate(0)) + _, err = file.Seek(0, 0) + require.NoError(t, err) + file.Close() + } + }(fileNum) + } + + waitForMessages(t, logReceived, expected) + wg.Wait() +} + +type rotationTest struct { + name string + totalLines int + maxLinesPerFile int + maxBackupFiles int + writeInterval time.Duration + pollInterval time.Duration + ephemeralLines bool +} + +/* + When log files are rotated at extreme speeds, it is possible to miss some log entries. + This can happen when an individual log entry is written and deleted within the duration + of a single poll interval. For example, consider the following scenario: + - A log file may have up to 9 backups (10 total log files) + - Each log file may contain up to 10 entries + - Log entries are written at an interval of 10µs + - Log files are polled at an interval of 100ms + In this scenario, a log entry that is written may only exist on disk for about 1ms. + A polling interval of 100ms will most likely never produce a chance to read the log file. + + In production settings, this consideration is not very likely to be a problem, but it is + easy to encounter the issue in tests, and difficult to deterministically simulate edge cases. + However, the above understanding does allow for some consistent expectations. + 1) Cases that do not require deletion of old log entries should always pass. + 2) Cases where the polling interval is sufficiently rapid should always pass. + 3) When neither 1 nor 2 is true, there may be missing entries, but still no duplicates. + + The following method is provided largely as documentation of how this is expected to behave. + In practice, timing is largely dependent on the responsiveness of system calls. +*/ +func (rt rotationTest) expectEphemeralLines() bool { + // primary + backups + maxLinesInAllFiles := rt.maxLinesPerFile + rt.maxLinesPerFile*rt.maxBackupFiles + + // Will the test write enough lines to result in deletion of oldest backups? + maxBackupsExceeded := rt.totalLines > maxLinesInAllFiles + + // last line written in primary file will exist for l*b more writes + minTimeToLive := time.Duration(int(rt.writeInterval) * rt.maxLinesPerFile * rt.maxBackupFiles) + + // can a line be written and then rotated to deletion before ever observed? + return maxBackupsExceeded && rt.pollInterval > minTimeToLive +} + +func (rt rotationTest) run(tc rotationTest, copyTruncate, sequential bool) func(t *testing.T) { + return func(t *testing.T) { + operator, logReceived, tempDir := newTestFileOperator(t, + func(cfg *InputConfig) { + cfg.PollInterval = helper.NewDuration(tc.pollInterval) + }, + func(out *testutil.FakeOutput) { + out.Received = make(chan *entry.Entry, tc.totalLines) + }, + ) + logger := getRotatingLogger(t, tempDir, tc.maxLinesPerFile, tc.maxBackupFiles, copyTruncate, sequential) + + expected := make([]string, 0, tc.totalLines) + baseStr := stringWithLength(46) // + ' 123' + for i := 0; i < tc.totalLines; i++ { + expected = append(expected, fmt.Sprintf("%s %3d", baseStr, i)) + } + + require.NoError(t, operator.Start(testutil.NewMockPersister("test"))) + defer operator.Stop() + + for _, message := range expected { + logger.Println(message) + time.Sleep(tc.writeInterval) + } + + received := make([]string, 0, tc.totalLines) + LOOP: + for { + select { + case e := <-logReceived: + received = append(received, e.Body.(string)) + case <-time.After(200 * time.Millisecond): + break LOOP + } + } + + if tc.ephemeralLines { + if !tc.expectEphemeralLines() { + // This is helpful for test development, and ensures the sample computation is used + t.Logf("Potentially unstable ephemerality expectation for test: %s", tc.name) + } + require.Subset(t, expected, received) + } else { + require.ElementsMatch(t, expected, received) + } + } +} + +func TestRotation(t *testing.T) { + cases := []rotationTest{ + { + name: "NoRotation", + totalLines: 10, + maxLinesPerFile: 10, + maxBackupFiles: 1, + writeInterval: time.Millisecond, + pollInterval: 10 * time.Millisecond, + }, + { + name: "NoDeletion", + totalLines: 20, + maxLinesPerFile: 10, + maxBackupFiles: 1, + writeInterval: time.Millisecond, + pollInterval: 10 * time.Millisecond, + }, + { + name: "Deletion", + totalLines: 30, + maxLinesPerFile: 10, + maxBackupFiles: 1, + writeInterval: time.Millisecond, + pollInterval: 10 * time.Millisecond, + ephemeralLines: true, + }, + { + name: "Deletion/ExceedFingerprint", + totalLines: 300, + maxLinesPerFile: 100, + maxBackupFiles: 1, + writeInterval: time.Millisecond, + pollInterval: 10 * time.Millisecond, + ephemeralLines: true, + }, + } + + for _, tc := range cases { + t.Run(fmt.Sprintf("%s/MoveCreateTimestamped", tc.name), tc.run(tc, false, false)) + t.Run(fmt.Sprintf("%s/MoveCreateSequential", tc.name), tc.run(tc, false, true)) + t.Run(fmt.Sprintf("%s/CopyTruncateTimestamped", tc.name), tc.run(tc, true, false)) + t.Run(fmt.Sprintf("%s/CopyTruncateSequential", tc.name), tc.run(tc, true, true)) + } +} + +func TestMoveFile(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("Moving files while open is unsupported on Windows") + } + t.Parallel() + operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) + operator.persister = testutil.NewMockPersister("test") + + temp1 := openTemp(t, tempDir) + writeString(t, temp1, "testlog1\n") + temp1.Close() + + operator.poll(context.Background()) + defer operator.Stop() + + waitForMessage(t, logReceived, "testlog1") + + // Wait until all goroutines are finished before renaming + operator.wg.Wait() + err := os.Rename(temp1.Name(), fmt.Sprintf("%s.2", temp1.Name())) + require.NoError(t, err) + + operator.poll(context.Background()) + expectNoMessages(t, logReceived) +} + +// TruncateThenWrite tests that, after a file has been truncated, +// any new writes are picked up +func TestTruncateThenWrite(t *testing.T) { + t.Parallel() + operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) + operator.persister = testutil.NewMockPersister("test") + + temp1 := openTemp(t, tempDir) + writeString(t, temp1, "testlog1\ntestlog2\n") + + operator.poll(context.Background()) + defer operator.Stop() + + waitForMessage(t, logReceived, "testlog1") + waitForMessage(t, logReceived, "testlog2") + + require.NoError(t, temp1.Truncate(0)) + temp1.Seek(0, 0) + + writeString(t, temp1, "testlog3\n") + operator.poll(context.Background()) + waitForMessage(t, logReceived, "testlog3") + expectNoMessages(t, logReceived) +} + +// CopyTruncateWriteBoth tests that when a file is copied +// with unread logs on the end, then the original is truncated, +// we get the unread logs on the copy as well as any new logs +// written to the truncated file +func TestCopyTruncateWriteBoth(t *testing.T) { + t.Parallel() + operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) + operator.persister = testutil.NewMockPersister("test") + + temp1 := openTemp(t, tempDir) + writeString(t, temp1, "testlog1\ntestlog2\n") + + operator.poll(context.Background()) + defer operator.Stop() + + waitForMessage(t, logReceived, "testlog1") + waitForMessage(t, logReceived, "testlog2") + operator.wg.Wait() // wait for all goroutines to finish + + // Copy the first file to a new file, and add another log + temp2 := openTemp(t, tempDir) + _, err := io.Copy(temp2, temp1) + require.NoError(t, err) + + // Truncate original file + require.NoError(t, temp1.Truncate(0)) + temp1.Seek(0, 0) + + // Write to original and new file + writeString(t, temp2, "testlog3\n") + writeString(t, temp1, "testlog4\n") + + // Expect both messages to come through + operator.poll(context.Background()) + waitForMessages(t, logReceived, []string{"testlog3", "testlog4"}) +} + +func TestFileMovedWhileOff_BigFiles(t *testing.T) { + t.Parallel() + operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) + persister := testutil.NewMockPersister("test") + + log1 := stringWithLength(1000) + log2 := stringWithLength(1000) + + temp := openTemp(t, tempDir) + writeString(t, temp, log1+"\n") + require.NoError(t, temp.Close()) + + // Start the operator + require.NoError(t, operator.Start(persister)) + defer operator.Stop() + waitForMessage(t, logReceived, log1) + + // Stop the operator, then rename and write a new log + require.NoError(t, operator.Stop()) + + err := os.Rename(temp.Name(), fmt.Sprintf("%s2", temp.Name())) + require.NoError(t, err) + + temp = reopenTemp(t, temp.Name()) + require.NoError(t, err) + writeString(t, temp, log2+"\n") + + // Expect the message written to the new log to come through + require.NoError(t, operator.Start(persister)) + waitForMessage(t, logReceived, log2) +} diff --git a/operator/builtin/input/file/util_test.go b/operator/builtin/input/file/util_test.go new file mode 100644 index 00000000..37920f4d --- /dev/null +++ b/operator/builtin/input/file/util_test.go @@ -0,0 +1,190 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package file + +import ( + "fmt" + "io/ioutil" + "log" + "math/rand" + "os" + "path/filepath" + "testing" + "time" + + "github.com/observiq/nanojack" + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-log-collection/entry" + "github.com/open-telemetry/opentelemetry-log-collection/operator" + "github.com/open-telemetry/opentelemetry-log-collection/operator/helper" + "github.com/open-telemetry/opentelemetry-log-collection/testutil" +) + +func newDefaultConfig(tempDir string) *InputConfig { + cfg := NewInputConfig("testfile") + cfg.PollInterval = helper.Duration{Duration: 50 * time.Millisecond} + cfg.StartAt = "beginning" + cfg.Include = []string{fmt.Sprintf("%s/*", tempDir)} + cfg.OutputIDs = []string{"fake"} + return cfg +} + +func newTestFileOperator(t *testing.T, cfgMod func(*InputConfig), outMod func(*testutil.FakeOutput)) (*InputOperator, chan *entry.Entry, string) { + fakeOutput := testutil.NewFakeOutput(t) + if outMod != nil { + outMod(fakeOutput) + } + + tempDir := testutil.NewTempDir(t) + + cfg := newDefaultConfig(tempDir) + if cfgMod != nil { + cfgMod(cfg) + } + ops, err := cfg.Build(testutil.NewBuildContext(t)) + require.NoError(t, err) + op := ops[0] + + err = op.SetOutputs([]operator.Operator{fakeOutput}) + require.NoError(t, err) + + return op.(*InputOperator), fakeOutput.Received, tempDir +} + +func openFile(t testing.TB, path string) *os.File { + file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0777) + require.NoError(t, err) + t.Cleanup(func() { _ = file.Close() }) + return file +} + +func openTemp(t testing.TB, tempDir string) *os.File { + return openTempWithPattern(t, tempDir, "") +} + +func reopenTemp(t testing.TB, name string) *os.File { + return openTempWithPattern(t, filepath.Dir(name), filepath.Base(name)) +} + +func openTempWithPattern(t testing.TB, tempDir, pattern string) *os.File { + file, err := ioutil.TempFile(tempDir, pattern) + require.NoError(t, err) + t.Cleanup(func() { _ = file.Close() }) + return file +} + +func getRotatingLogger(t testing.TB, tempDir string, maxLines, maxBackups int, copyTruncate, sequential bool) *log.Logger { + file, err := ioutil.TempFile(tempDir, "") + require.NoError(t, err) + require.NoError(t, file.Close()) // will be managed by rotator + + rotator := nanojack.Logger{ + Filename: file.Name(), + MaxLines: maxLines, + MaxBackups: maxBackups, + CopyTruncate: copyTruncate, + Sequential: sequential, + } + + t.Cleanup(func() { _ = rotator.Close() }) + + return log.New(&rotator, "", 0) +} + +func writeString(t testing.TB, file *os.File, s string) { + _, err := file.WriteString(s) + require.NoError(t, err) +} + +func stringWithLength(length int) string { + charset := "abcdefghijklmnopqrstuvwxyz" + b := make([]byte, length) + for i := range b { + b[i] = charset[rand.Intn(len(charset))] + } + return string(b) +} + +func waitForOne(t *testing.T, c chan *entry.Entry) *entry.Entry { + select { + case e := <-c: + return e + case <-time.After(time.Second): + require.FailNow(t, "Timed out waiting for message") + return nil + } +} + +func waitForN(t *testing.T, c chan *entry.Entry, n int) []string { + messages := make([]string, 0, n) + for i := 0; i < n; i++ { + select { + case e := <-c: + messages = append(messages, e.Body.(string)) + case <-time.After(time.Second): + require.FailNow(t, "Timed out waiting for message") + return nil + } + } + return messages +} + +func waitForMessage(t *testing.T, c chan *entry.Entry, expected string) { + select { + case e := <-c: + require.Equal(t, expected, e.Body.(string)) + case <-time.After(time.Second): + require.FailNow(t, "Timed out waiting for message", expected) + } +} + +func waitForMessages(t *testing.T, c chan *entry.Entry, expected []string) { + receivedMessages := make([]string, 0, len(expected)) +LOOP: + for { + select { + case e := <-c: + receivedMessages = append(receivedMessages, e.Body.(string)) + case <-time.After(time.Second): + break LOOP + } + } + + require.ElementsMatch(t, expected, receivedMessages) +} + +func expectNoMessages(t *testing.T, c chan *entry.Entry) { + expectNoMessagesUntil(t, c, 200*time.Millisecond) +} + +func expectNoMessagesUntil(t *testing.T, c chan *entry.Entry, d time.Duration) { + select { + case e := <-c: + require.FailNow(t, "Received unexpected message", "Message: %s", e.Body.(string)) + case <-time.After(d): + } +} + +// writes file with the specified file names and returns their full paths in order +func writeTempFiles(tempDir string, names []string) []string { + result := make([]string, 0, len(names)) + for _, name := range names { + path := filepath.Join(tempDir, name) + ioutil.WriteFile(path, []byte(name), 0755) + result = append(result, path) + } + return result +}