Skip to content

Commit

Permalink
Merge pull request #647 from ruflin/harvester-finishing
Browse files Browse the repository at this point in the history
Close filebeat if prospector without paths exists
  • Loading branch information
Steffen Siering committed Jan 7, 2016
2 parents 0ddc1b2 + 6fbf443 commit a54284f
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 4 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff]

*Filebeat*
- Add exclude_files configuration option {pull}563[563]
- Stop filebeat if filebeat is started without any prospectors defined {pull}644[644]
- Stop filebeat if filebeat is started without any prospectors defined or empty prospectors {pull}644[644] {pull}647[647]

*Winlogbeat*

Expand Down
4 changes: 4 additions & 0 deletions filebeat/crawler/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func (p *Prospector) setupProspectorConfig() error {
// Init File Stat list
p.prospectorList = make(map[string]harvester.FileStat)

if config.Harvester.InputType == cfg.LogInputType && len(config.Paths) == 0 {
return fmt.Errorf("No paths were defined for prospector")
}

return nil
}

Expand Down
18 changes: 18 additions & 0 deletions filebeat/crawler/prospector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func TestProspectorInitInvalidIgnoreOlder(t *testing.T) {
func TestProspectorInitInputTypeLog(t *testing.T) {

prospectorConfig := config.ProspectorConfig{
Paths: []string{"testpath1", "testpath2"},
Harvester: config.HarvesterConfig{
InputType: "log",
},
Expand All @@ -138,6 +139,23 @@ func TestProspectorInitInputTypeLog(t *testing.T) {
assert.Equal(t, "log", prospector.ProspectorConfig.Harvester.InputType)
}

func TestProspectorInitInputTypeLogError(t *testing.T) {

prospectorConfig := config.ProspectorConfig{
Harvester: config.HarvesterConfig{
InputType: "log",
},
}

prospector := Prospector{
ProspectorConfig: prospectorConfig,
}

err := prospector.Init()
// Error should be returned because no path is set
assert.Error(t, err)
}

func TestProspectorInitInputTypeStdin(t *testing.T) {

prospectorConfig := config.ProspectorConfig{
Expand Down
66 changes: 63 additions & 3 deletions filebeat/tests/system/test_prospector.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def test_rotating_ignore_older_low_write_rate(self):
os.mkdir(self.working_dir + "/log/")
testfile = self.working_dir + "/log/test.log"

proc = self.start_filebeat(debug_selectors=['*'])
filebeat = self.start_filebeat(debug_selectors=['*'])

# wait for first "Start next scan" log message
self.wait_until(
Expand Down Expand Up @@ -230,14 +230,17 @@ def test_rotating_ignore_older_low_write_rate(self):
lambda: self.output_count(lambda x: x >= lines),
max_timeout=5)

proc.kill_and_wait()
filebeat.kill_and_wait()

def test_shutdown_no_prospectors(self):
"""
In case no prospectors are defined, filebeat must shut down and report an error
"""
self.render_config_template(
prospectors=False,
)

proc = self.start_filebeat(debug_selectors=['*'])
filebeat = self.start_filebeat(debug_selectors=['*'])

# wait for first "Start next scan" log message
self.wait_until(
Expand All @@ -249,3 +252,60 @@ def test_shutdown_no_prospectors(self):
lambda: self.log_contains(
"shutting down"),
max_timeout=10)

filebeat.kill_and_wait()


def test_no_paths_defined(self):
"""
In case a prospector is defined but doesn't contain any paths, prospector must return error which
leads to shutdown of filebeat because of configuration error
"""
self.render_config_template(
)

filebeat = self.start_filebeat(debug_selectors=['*'])

# wait for first "Start next scan" log message
self.wait_until(
lambda: self.log_contains(
"No paths were defined for prospector"),
max_timeout=10)

self.wait_until(
lambda: self.log_contains(
"shutting down"),
max_timeout=10)

filebeat.kill_and_wait()


def test_files_added_late(self):
"""
Tests that prospectors stay running even though no harvesters are started yet
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
)

os.mkdir(self.working_dir + "/log/")

filebeat = self.start_filebeat(debug_selectors=['*'])

# wait until events are sent for the first time
self.wait_until(
lambda: self.log_contains(
"Events flushed"),
max_timeout=10)

testfile = self.working_dir + "/log/test.log"
with open(testfile, 'a') as file:
file.write("Hello World1\n")
file.write("Hello World2\n")

# wait for log to be read
self.wait_until(
lambda: self.output_has(lines=2),
max_timeout=15)

filebeat.kill_and_wait()

0 comments on commit a54284f

Please sign in to comment.