Skip to content

Commit

Permalink
Prospector level processors (#3853)
Browse files Browse the repository at this point in the history
* System tests for prospector level processors
* Adding registrar test
  • Loading branch information
vjsamuel authored and ruflin committed Apr 5, 2017
1 parent 37dabda commit e73bb6b
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff]
- close_timeout is also applied when the output is blocking. {pull}3511[3511]
- Improve handling of different path variants on Windows. {pull}3781[3781]
- Restructure input.Event to be inline with outputs.Data {pull}3823[3823]
- Add base for supporting prospector level processors {pull}3853[3853]

*Heartbeat*

Expand Down
36 changes: 19 additions & 17 deletions filebeat/prospector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/elastic/beats/filebeat/harvester/reader"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/match"
"github.com/elastic/beats/libbeat/processors"
)

var (
Expand All @@ -26,23 +27,24 @@ var (
)

type prospectorConfig struct {
common.EventMetadata `config:",inline"` // Fields and tags to add to events.
Enabled bool `config:"enabled"`
DocumentType string `config:"document_type"`
ExcludeFiles []match.Matcher `config:"exclude_files"`
IgnoreOlder time.Duration `config:"ignore_older"`
Paths []string `config:"paths"`
ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"`
InputType string `config:"input_type"`
CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"`
CleanRemoved bool `config:"clean_removed"`
HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"`
Symlinks bool `config:"symlinks"`
TailFiles bool `config:"tail_files"`
JSON *reader.JSONConfig `config:"json"`
Pipeline string `config:"pipeline"`
Module string `config:"_module_name"` // hidden option to set the module name
Fileset string `config:"_fileset_name"` // hidden option to set the fileset name
common.EventMetadata `config:",inline"` // Fields and tags to add to events.
Enabled bool `config:"enabled"`
DocumentType string `config:"document_type"`
ExcludeFiles []match.Matcher `config:"exclude_files"`
IgnoreOlder time.Duration `config:"ignore_older"`
Paths []string `config:"paths"`
ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"`
InputType string `config:"input_type"`
CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"`
CleanRemoved bool `config:"clean_removed"`
HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"`
Symlinks bool `config:"symlinks"`
TailFiles bool `config:"tail_files"`
JSON *reader.JSONConfig `config:"json"`
Pipeline string `config:"pipeline"`
Module string `config:"_module_name"` // hidden option to set the module name
Fileset string `config:"_fileset_name"` // hidden option to set the fileset name
Processors processors.PluginConfig `config:"processors"`
}

func (config *prospectorConfig) Validate() error {
Expand Down
18 changes: 18 additions & 0 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/processors"
)

var (
Expand All @@ -40,6 +41,7 @@ type Prospector struct {
registry *harvesterRegistry
beatDone chan struct{}
eventCounter *sync.WaitGroup
processors *processors.Processors
}

// Prospectorer is the interface common to all prospectors
Expand Down Expand Up @@ -84,6 +86,13 @@ func NewProspector(cfg *common.Config, outlet Outlet, beatDone chan struct{}) (*
return nil, err
}

f, err := processors.New(prospector.config.Processors)
if err != nil {
return nil, err
}

prospector.processors = f

logp.Debug("prospector", "File Configs: %v", prospector.config.Paths)

return prospector, nil
Expand Down Expand Up @@ -215,6 +224,15 @@ func (p *Prospector) updateState(event *input.Event) error {
event.Fileset = p.config.Fileset

eventHolder := event.GetData()
//run the filters before sending to spooler
if event.Bytes > 0 {
eventHolder.Event = p.processors.Run(eventHolder.Event)
}

if eventHolder.Event == nil {
eventHolder.Metadata.Bytes = 0
}

ok := p.outlet.OnEvent(&eventHolder)

if !ok {
Expand Down
15 changes: 14 additions & 1 deletion filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,20 @@ filebeat.prospectors:
harvester_limit: {{harvester_limit | default(0) }}
symlinks: {{symlinks}}
pipeline: {{pipeline}}

{%- if prospector_processors %}
processors:
{%- for processor in prospector_processors %}
{%- for name, settings in processor.iteritems() %}
- {{name}}:
{%- if settings %}
{%- for k, v in settings.iteritems() %}
{{k}}:
{{v | default([])}}
{%- endfor %}
{%- endif %}
{%- endfor %}
{%- endfor %}
{% endif %}
{% if fields %}
fields:
{% for k,v in fields.items() %}
Expand Down
50 changes: 50 additions & 0 deletions filebeat/tests/system/test_prospector.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,3 +632,53 @@ def test_harvester_limit(self):
assert len(data) == 3

filebeat.check_kill_and_wait()

def test_prospector_filter_dropfields(self):
"""
Check drop_fields filtering action at a prospector level
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/test.log",
prospector_processors=[{
"drop_fields": {
"fields": ["offset"],
},
}]
)
with open(self.working_dir + "/test.log", "w") as f:
f.write("test message\n")

filebeat = self.start_beat()
self.wait_until(lambda: self.output_has(lines=1))
filebeat.check_kill_and_wait()

output = self.read_output(
required_fields=["@timestamp", "type"],
)[0]
assert "offset" not in output
assert "message" in output

def test_prospector_filter_includefields(self):
"""
Check include_fields filtering action at a prospector level
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/test.log",
prospector_processors=[{
"include_fields": {
"fields": ["offset"],
},
}]
)
with open(self.working_dir + "/test.log", "w") as f:
f.write("test message\n")

filebeat = self.start_beat()
self.wait_until(lambda: self.output_has(lines=1))
filebeat.check_kill_and_wait()

output = self.read_output(
required_fields=["@timestamp", "type"],
)[0]
assert "message" not in output
assert "offset" in output
76 changes: 76 additions & 0 deletions filebeat/tests/system/test_registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -1319,3 +1319,79 @@ def test_ignore_older_state_clean_inactive(self):

data = self.get_registry()
assert len(data) == 0

def test_registrar_files_with_prospector_level_processors(self):
"""
Check that multiple files are put into registrar file with drop event processor
"""

self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
prospector_processors=[{
"drop_event": {},
}]
)
os.mkdir(self.working_dir + "/log/")

testfile_path1 = self.working_dir + "/log/test1.log"
testfile_path2 = self.working_dir + "/log/test2.log"
file1 = open(testfile_path1, 'w')
file2 = open(testfile_path2, 'w')

iterations = 5
for _ in range(0, iterations):
file1.write("hello world") # 11 chars
file1.write("\n") # 1 char
file2.write("goodbye world") # 11 chars
file2.write("\n") # 1 char

file1.close()
file2.close()

filebeat = self.start_beat()

# wait until the registry file exist. Needed to avoid a race between
# the logging and actual writing the file. Seems to happen on Windows.
self.wait_until(
lambda: os.path.isfile(os.path.join(self.working_dir,
"registry")),
max_timeout=10)

# Wait a momemt to make sure registry is completely written
time.sleep(2)

filebeat.check_kill_and_wait()

# Check that file exist
data = self.get_registry()

# Check that 2 files are port of the registrar file
assert len(data) == 2

logfile_abs_path = os.path.abspath(testfile_path1)
record = self.get_registry_entry_by_path(logfile_abs_path)

self.assertDictContainsSubset({
"source": logfile_abs_path,
"offset": iterations * (len("hello world") + len(os.linesep)),
}, record)
self.assertTrue("FileStateOS" in record)
file_state_os = record["FileStateOS"]

if os.name == "nt":
# Windows checks
# TODO: Check for IdxHi, IdxLo, Vol in FileStateOS on Windows.
self.assertEqual(len(file_state_os), 3)
elif platform.system() == "SunOS":
stat = os.stat(logfile_abs_path)
self.assertEqual(file_state_os["inode"], stat.st_ino)

# Python does not return the same st_dev value as Golang or the
# command line stat tool so just check that it's present.
self.assertTrue("device" in file_state_os)
else:
stat = os.stat(logfile_abs_path)
self.assertDictContainsSubset({
"inode": stat.st_ino,
"device": stat.st_dev,
}, file_state_os)

0 comments on commit e73bb6b

Please sign in to comment.