Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prospector level processors #3853

Merged
merged 3 commits into from
Apr 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff]
- close_timeout is also applied when the output is blocking. {pull}3511[3511]
- Improve handling of different path variants on Windows. {pull}3781[3781]
- Restructure input.Event to be inline with outputs.Data {pull}3823[3823]
- Add base for supporting prospector level processors {pull}3853[3853]

*Heartbeat*

Expand Down
36 changes: 19 additions & 17 deletions filebeat/prospector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/elastic/beats/filebeat/harvester/reader"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/match"
"github.com/elastic/beats/libbeat/processors"
)

var (
Expand All @@ -26,23 +27,24 @@ var (
)

type prospectorConfig struct {
common.EventMetadata `config:",inline"` // Fields and tags to add to events.
Enabled bool `config:"enabled"`
DocumentType string `config:"document_type"`
ExcludeFiles []match.Matcher `config:"exclude_files"`
IgnoreOlder time.Duration `config:"ignore_older"`
Paths []string `config:"paths"`
ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"`
InputType string `config:"input_type"`
CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"`
CleanRemoved bool `config:"clean_removed"`
HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"`
Symlinks bool `config:"symlinks"`
TailFiles bool `config:"tail_files"`
JSON *reader.JSONConfig `config:"json"`
Pipeline string `config:"pipeline"`
Module string `config:"_module_name"` // hidden option to set the module name
Fileset string `config:"_fileset_name"` // hidden option to set the fileset name
common.EventMetadata `config:",inline"` // Fields and tags to add to events.
Enabled bool `config:"enabled"`
DocumentType string `config:"document_type"`
ExcludeFiles []match.Matcher `config:"exclude_files"`
IgnoreOlder time.Duration `config:"ignore_older"`
Paths []string `config:"paths"`
ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"`
InputType string `config:"input_type"`
CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"`
CleanRemoved bool `config:"clean_removed"`
HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"`
Symlinks bool `config:"symlinks"`
TailFiles bool `config:"tail_files"`
JSON *reader.JSONConfig `config:"json"`
Pipeline string `config:"pipeline"`
Module string `config:"_module_name"` // hidden option to set the module name
Fileset string `config:"_fileset_name"` // hidden option to set the fileset name
Processors processors.PluginConfig `config:"processors"`
}

func (config *prospectorConfig) Validate() error {
Expand Down
18 changes: 18 additions & 0 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/processors"
)

var (
Expand All @@ -40,6 +41,7 @@ type Prospector struct {
registry *harvesterRegistry
beatDone chan struct{}
eventCounter *sync.WaitGroup
processors *processors.Processors
}

// Prospectorer is the interface common to all prospectors
Expand Down Expand Up @@ -84,6 +86,13 @@ func NewProspector(cfg *common.Config, outlet Outlet, beatDone chan struct{}) (*
return nil, err
}

f, err := processors.New(prospector.config.Processors)
if err != nil {
return nil, err
}

prospector.processors = f

logp.Debug("prospector", "File Configs: %v", prospector.config.Paths)

return prospector, nil
Expand Down Expand Up @@ -215,6 +224,15 @@ func (p *Prospector) updateState(event *input.Event) error {
event.Fileset = p.config.Fileset

eventHolder := event.GetData()
//run the filters before sending to spooler
if event.Bytes > 0 {
eventHolder.Event = p.processors.Run(eventHolder.Event)
}

if eventHolder.Event == nil {
eventHolder.Metadata.Bytes = 0
}

ok := p.outlet.OnEvent(&eventHolder)

if !ok {
Expand Down
15 changes: 14 additions & 1 deletion filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,20 @@ filebeat.prospectors:
harvester_limit: {{harvester_limit | default(0) }}
symlinks: {{symlinks}}
pipeline: {{pipeline}}

{%- if prospector_processors %}
processors:
{%- for processor in prospector_processors %}
{%- for name, settings in processor.iteritems() %}
- {{name}}:
{%- if settings %}
{%- for k, v in settings.iteritems() %}
{{k}}:
{{v | default([])}}
{%- endfor %}
{%- endif %}
{%- endfor %}
{%- endfor %}
{% endif %}
{% if fields %}
fields:
{% for k,v in fields.items() %}
Expand Down
50 changes: 50 additions & 0 deletions filebeat/tests/system/test_prospector.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,3 +632,53 @@ def test_harvester_limit(self):
assert len(data) == 3

filebeat.check_kill_and_wait()

def test_prospector_filter_dropfields(self):
"""
Check drop_fields filtering action at a prospector level
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/test.log",
prospector_processors=[{
"drop_fields": {
"fields": ["offset"],
},
}]
)
with open(self.working_dir + "/test.log", "w") as f:
f.write("test message\n")

filebeat = self.start_beat()
self.wait_until(lambda: self.output_has(lines=1))
filebeat.check_kill_and_wait()

output = self.read_output(
required_fields=["@timestamp", "type"],
)[0]
assert "offset" not in output
assert "message" in output

def test_prospector_filter_includefields(self):
"""
Check include_fields filtering action at a prospector level
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/test.log",
prospector_processors=[{
"include_fields": {
"fields": ["offset"],
},
}]
)
with open(self.working_dir + "/test.log", "w") as f:
f.write("test message\n")

filebeat = self.start_beat()
self.wait_until(lambda: self.output_has(lines=1))
filebeat.check_kill_and_wait()

output = self.read_output(
required_fields=["@timestamp", "type"],
)[0]
assert "message" not in output
assert "offset" in output
76 changes: 76 additions & 0 deletions filebeat/tests/system/test_registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -1312,3 +1312,79 @@ def test_ignore_older_state_clean_inactive(self):

data = self.get_registry()
assert len(data) == 0

def test_registrar_files_with_prospector_level_processors(self):
"""
Check that multiple files are put into registrar file with drop event processor
"""

self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
prospector_processors=[{
"drop_event": {},
}]
)
os.mkdir(self.working_dir + "/log/")

testfile_path1 = self.working_dir + "/log/test1.log"
testfile_path2 = self.working_dir + "/log/test2.log"
file1 = open(testfile_path1, 'w')
file2 = open(testfile_path2, 'w')

iterations = 5
for _ in range(0, iterations):
file1.write("hello world") # 11 chars
file1.write("\n") # 1 char
file2.write("goodbye world") # 11 chars
file2.write("\n") # 1 char

file1.close()
file2.close()

filebeat = self.start_beat()

# wait until the registry file exist. Needed to avoid a race between
# the logging and actual writing the file. Seems to happen on Windows.
self.wait_until(
lambda: os.path.isfile(os.path.join(self.working_dir,
"registry")),
max_timeout=10)

# Wait a momemt to make sure registry is completely written
time.sleep(2)

filebeat.check_kill_and_wait()

# Check that file exist
data = self.get_registry()

# Check that 2 files are port of the registrar file
assert len(data) == 2

logfile_abs_path = os.path.abspath(testfile_path1)
record = self.get_registry_entry_by_path(logfile_abs_path)

self.assertDictContainsSubset({
"source": logfile_abs_path,
"offset": iterations * (len("hello world") + len(os.linesep)),
}, record)
self.assertTrue("FileStateOS" in record)
file_state_os = record["FileStateOS"]

if os.name == "nt":
# Windows checks
# TODO: Check for IdxHi, IdxLo, Vol in FileStateOS on Windows.
self.assertEqual(len(file_state_os), 3)
elif platform.system() == "SunOS":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you run the tests on such a platform? I would assume this could also break some other tests ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i copy pasted this from another test case as i assumed that it was there for a reason.

stat = os.stat(logfile_abs_path)
self.assertEqual(file_state_os["inode"], stat.st_ino)

# Python does not return the same st_dev value as Golang or the
# command line stat tool so just check that it's present.
self.assertTrue("device" in file_state_os)
else:
stat = os.stat(logfile_abs_path)
self.assertDictContainsSubset({
"inode": stat.st_ino,
"device": stat.st_dev,
}, file_state_os)