From 2151d157819b40b7a70b6deeab5aa46af25b57b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Fri, 16 Oct 2020 11:18:44 +0200 Subject: [PATCH] Add tests for fileProspector in filestream input (#21712) ## What does this PR do? This PR adds tests to see how `fileProspector` handles Create, Write and Delete operations. In order to make the `Prospector` testable I changed `HarvesterGroup` an interface so it can be mocked During the testing an issue with path identifier showed up when a file was deleted. The identifier generated an incorrect value for `Name`. Now it is fixed. --- filebeat/input/filestream/identifier.go | 6 +- .../internal/input-logfile/harvester.go | 10 +- .../internal/input-logfile/input.go | 2 +- .../internal/input-logfile/prospector.go | 2 +- filebeat/input/filestream/prospector.go | 15 +- filebeat/input/filestream/prospector_test.go | 197 ++++++++++++++++++ 6 files changed, 218 insertions(+), 14 deletions(-) create mode 100644 filebeat/input/filestream/prospector_test.go diff --git a/filebeat/input/filestream/identifier.go b/filebeat/input/filestream/identifier.go index 736c66da2f0..63883383a1c 100644 --- a/filebeat/input/filestream/identifier.go +++ b/filebeat/input/filestream/identifier.go @@ -116,11 +116,15 @@ func newPathIdentifier(_ *common.Config) (fileIdentifier, error) { } func (p *pathIdentifier) GetSource(e loginp.FSEvent) fileSource { + path := e.NewPath + if e.Op == loginp.OpDelete { + path = e.OldPath + } return fileSource{ info: e.Info, newPath: e.NewPath, oldPath: e.OldPath, - name: pluginName + identitySep + p.name + identitySep + e.NewPath, + name: pluginName + identitySep + p.name + identitySep + path, identifierGenerator: p.name, } } diff --git a/filebeat/input/filestream/internal/input-logfile/harvester.go b/filebeat/input/filestream/internal/input-logfile/harvester.go index d2f184cac7b..3c7573ad460 100644 --- a/filebeat/input/filestream/internal/input-logfile/harvester.go +++ b/filebeat/input/filestream/internal/input-logfile/harvester.go @@ -43,7 +43,11 @@ type Harvester interface { // HarvesterGroup is responsible for running the // Harvesters started by the Prospector. -type HarvesterGroup struct { +type HarvesterGroup interface { + Run(input.Context, Source) error +} + +type defaultHarvesterGroup struct { manager *InputManager readers map[string]context.CancelFunc pipeline beat.PipelineConnector @@ -54,7 +58,7 @@ type HarvesterGroup struct { } // Run starts the Harvester for a Source. -func (hg *HarvesterGroup) Run(ctx input.Context, s Source) error { +func (hg *defaultHarvesterGroup) Run(ctx input.Context, s Source) error { log := ctx.Logger.With("source", s.Name()) log.Debug("Starting harvester for file") @@ -111,7 +115,7 @@ func (hg *HarvesterGroup) Run(ctx input.Context, s Source) error { } // Cancel stops the running Harvester for a given Source. -func (hg *HarvesterGroup) Cancel(s Source) error { +func (hg *defaultHarvesterGroup) Cancel(s Source) error { if cancel, ok := hg.readers[s.Name()]; ok { cancel() return nil diff --git a/filebeat/input/filestream/internal/input-logfile/input.go b/filebeat/input/filestream/internal/input-logfile/input.go index 7084315b0c1..11092479cf3 100644 --- a/filebeat/input/filestream/internal/input-logfile/input.go +++ b/filebeat/input/filestream/internal/input-logfile/input.go @@ -59,7 +59,7 @@ func (inp *managedInput) Run( store.Retain() defer store.Release() - hg := &HarvesterGroup{ + hg := &defaultHarvesterGroup{ pipeline: pipeline, readers: make(map[string]context.CancelFunc), manager: inp.manager, diff --git a/filebeat/input/filestream/internal/input-logfile/prospector.go b/filebeat/input/filestream/internal/input-logfile/prospector.go index 9488596eb2c..185d6f9ec7e 100644 --- a/filebeat/input/filestream/internal/input-logfile/prospector.go +++ b/filebeat/input/filestream/internal/input-logfile/prospector.go @@ -28,7 +28,7 @@ import ( type Prospector interface { // Run starts the event loop and handles the incoming events // either by starting/stopping a harvester, or updating the statestore. - Run(input.Context, *statestore.Store, *HarvesterGroup) + Run(input.Context, *statestore.Store, HarvesterGroup) // Test checks if the Prospector is able to run the configuration // specified by the user. Test() error diff --git a/filebeat/input/filestream/prospector.go b/filebeat/input/filestream/prospector.go index 94670e18ce7..11f479ccef8 100644 --- a/filebeat/input/filestream/prospector.go +++ b/filebeat/input/filestream/prospector.go @@ -72,7 +72,7 @@ func newFileProspector( } // Run starts the fileProspector which accepts FS events from a file watcher. -func (p *fileProspector) Run(ctx input.Context, s *statestore.Store, hg *loginp.HarvesterGroup) { +func (p *fileProspector) Run(ctx input.Context, s *statestore.Store, hg loginp.HarvesterGroup) { log := ctx.Logger.With("prospector", prospectorDebugKey) log.Debug("Starting prospector") defer log.Debug("Prospector has stopped") @@ -100,8 +100,12 @@ func (p *fileProspector) Run(ctx input.Context, s *statestore.Store, hg *loginp. src := p.identifier.GetSource(fe) switch fe.Op { - case loginp.OpCreate: - log.Debugf("A new file %s has been found", fe.NewPath) + case loginp.OpCreate, loginp.OpWrite: + if fe.Op == loginp.OpCreate { + log.Debugf("A new file %s has been found", fe.NewPath) + } else if fe.Op == loginp.OpWrite { + log.Debugf("File %s has been updated", fe.NewPath) + } if p.ignoreOlder > 0 { now := time.Now() @@ -113,11 +117,6 @@ func (p *fileProspector) Run(ctx input.Context, s *statestore.Store, hg *loginp. hg.Run(ctx, src) - case loginp.OpWrite: - log.Debugf("File %s has been updated", fe.NewPath) - - hg.Run(ctx, src) - case loginp.OpDelete: log.Debugf("File %s has been removed", fe.OldPath) diff --git a/filebeat/input/filestream/prospector_test.go b/filebeat/input/filestream/prospector_test.go new file mode 100644 index 00000000000..1f75b12d2bd --- /dev/null +++ b/filebeat/input/filestream/prospector_test.go @@ -0,0 +1,197 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package filestream + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" + input "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/beats/v7/libbeat/statestore/storetest" + "github.com/elastic/go-concert/unison" +) + +func TestProspectorNewAndUpdatedFiles(t *testing.T) { + minuteAgo := time.Now().Add(-1 * time.Minute) + + testCases := map[string]struct { + events []loginp.FSEvent + ignoreOlder time.Duration + expectedSources []string + }{ + "two new files": { + events: []loginp.FSEvent{ + loginp.FSEvent{Op: loginp.OpCreate, NewPath: "/path/to/file"}, + loginp.FSEvent{Op: loginp.OpCreate, NewPath: "/path/to/other/file"}, + }, + expectedSources: []string{"filestream::path::/path/to/file", "filestream::path::/path/to/other/file"}, + }, + "one updated file": { + events: []loginp.FSEvent{ + loginp.FSEvent{Op: loginp.OpWrite, NewPath: "/path/to/file"}, + }, + expectedSources: []string{"filestream::path::/path/to/file"}, + }, + "old files with ignore older configured": { + events: []loginp.FSEvent{ + loginp.FSEvent{ + Op: loginp.OpCreate, + NewPath: "/path/to/file", + Info: testFileInfo{"/path/to/file", 5, minuteAgo}, + }, + loginp.FSEvent{ + Op: loginp.OpWrite, + NewPath: "/path/to/other/file", + Info: testFileInfo{"/path/to/other/file", 5, minuteAgo}, + }, + }, + ignoreOlder: 10 * time.Second, + expectedSources: []string{}, + }, + "newer files with ignore older": { + events: []loginp.FSEvent{ + loginp.FSEvent{ + Op: loginp.OpCreate, + NewPath: "/path/to/file", + Info: testFileInfo{"/path/to/file", 5, minuteAgo}, + }, + loginp.FSEvent{ + Op: loginp.OpWrite, + NewPath: "/path/to/other/file", + Info: testFileInfo{"/path/to/other/file", 5, minuteAgo}, + }, + }, + ignoreOlder: 5 * time.Minute, + expectedSources: []string{"filestream::path::/path/to/file", "filestream::path::/path/to/other/file"}, + }, + } + + for name, test := range testCases { + test := test + + t.Run(name, func(t *testing.T) { + p := fileProspector{ + filewatcher: &mockFileWatcher{events: test.events}, + identifier: mustPathIdentifier(), + ignoreOlder: test.ignoreOlder, + } + ctx := input.Context{Logger: logp.L(), Cancelation: context.Background()} + hg := getTestHarvesterGroup() + + p.Run(ctx, testStateStore(), hg) + + assert.ElementsMatch(t, hg.encounteredNames, test.expectedSources) + }) + } +} + +func TestProspectorDeletedFile(t *testing.T) { + testCases := map[string]struct { + events []loginp.FSEvent + cleanRemoved bool + }{ + "one deleted file without clean removed": { + events: []loginp.FSEvent{ + loginp.FSEvent{Op: loginp.OpDelete, OldPath: "/path/to/file"}, + }, + cleanRemoved: false, + }, + "one deleted file with clean removed": { + events: []loginp.FSEvent{ + loginp.FSEvent{Op: loginp.OpDelete, OldPath: "/path/to/file"}, + }, + cleanRemoved: true, + }, + } + + for name, test := range testCases { + test := test + + t.Run(name, func(t *testing.T) { + p := fileProspector{ + filewatcher: &mockFileWatcher{events: test.events}, + identifier: mustPathIdentifier(), + cleanRemoved: test.cleanRemoved, + } + ctx := input.Context{Logger: logp.L(), Cancelation: context.Background()} + + testStore := testStateStore() + testStore.Set("filestream::path::/path/to/file", nil) + + p.Run(ctx, testStore, getTestHarvesterGroup()) + + has, err := testStore.Has("filestream::path::/path/to/file") + if err != nil { + t.Fatal(err) + } + + if test.cleanRemoved { + assert.False(t, has) + } else { + assert.True(t, has) + + } + }) + } +} + +type testHarvesterGroup struct { + encounteredNames []string +} + +func getTestHarvesterGroup() *testHarvesterGroup { return &testHarvesterGroup{make([]string, 0)} } + +func (t *testHarvesterGroup) Run(_ input.Context, s loginp.Source) error { + t.encounteredNames = append(t.encounteredNames, s.Name()) + return nil +} + +type mockFileWatcher struct { + nextIdx int + events []loginp.FSEvent +} + +func (m *mockFileWatcher) Event() loginp.FSEvent { + if len(m.events) == m.nextIdx { + return loginp.FSEvent{} + } + evt := m.events[m.nextIdx] + m.nextIdx++ + return evt +} +func (m *mockFileWatcher) Run(_ unison.Canceler) { return } + +func testStateStore() *statestore.Store { + s, _ := statestore.NewRegistry(storetest.NewMemoryStoreBackend()).Get(pluginName) + return s +} + +func mustPathIdentifier() fileIdentifier { + pathIdentifier, err := newPathIdentifier(nil) + if err != nil { + panic(err) + } + return pathIdentifier + +}