Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter out if #filter method returns nil record #515

Merged
merged 2 commits into from
Dec 16, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion lib/fluent/filter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,15 @@ def shutdown
end

def filter(tag, time, record)
raise NotImplementedError, "Implement this method in child class"
end

def filter_stream(tag, es)
new_es = MultiEventStream.new
es.each { |time, record|
begin
new_es.add(time, filter(tag, time, record))
filtered_record = filter(tag, time, record)
new_es.add(time, filtered_record) if filtered_record
rescue => e
router.emit_error_event(tag, time, record, e)
end
Expand Down
16 changes: 8 additions & 8 deletions lib/fluent/plugin/filter_grep.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,23 @@ def configure(conf)
end
end

def filter_stream(tag, es)
result_es = MultiEventStream.new
es.each do |time, record|
def filter(tag, time, record)
result = nil
begin
catch(:break_loop) do
@regexps.each do |key, regexp|
throw :break_loop unless match(regexp, record[key].to_s)
end
@excludes.each do |key, exclude|
throw :break_loop if match(exclude, record[key].to_s)
end
result_es.add(time, record)
result = record
end
rescue => e
log.warn "failed to grep events", :error_class => e.class, :error => e.message
log.warn_backtrace
end
result_es
rescue => e
log.warn "failed to grep events", :error_class => e.class, :error => e.message
log.warn_backtrace
result
end

private
Expand Down
80 changes: 80 additions & 0 deletions test/test_filter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
require_relative 'helper'
require 'fluent/filter'

class FilterTest < Test::Unit::TestCase
include Fluent

setup do
Fluent::Test.setup
@time = Fluent::Engine.now
end

def create_driver(klass = Fluent::Filter, conf = '')
Test::FilterTestDriver.new(klass).configure(conf, true)
end

def emit(klass, msgs, conf = '')
es = Fluent::MultiEventStream.new
msgs.each {|msg|
es.add(@time, {'message' => msg})
}

d = create_driver(klass, conf)
d.filter_stream('filter.test', es)
end

sub_test_case 'configure' do
test 'check default' do
assert_nothing_raised { create_driver }
end
end

sub_test_case 'filter' do
test 'NotImplementedError' do
not_implemented_filter = Class.new(Fluent::Filter)
assert_raise(NotImplementedError) { emit(not_implemented_filter, ['foo']) }
end

test 'null filter' do
null_filter = Class.new(Fluent::Filter) do |c|
def filter(tag, time, record)
nil
end
end
es = emit(null_filter, ['foo'])
assert_equal(0, es.instance_variable_get(:@record_array).size)
end

test 'pass filter' do
pass_filter = Class.new(Fluent::Filter) do |c|
def filter(tag, time, record)
record
end
end
es = emit(pass_filter, ['foo'])
assert_equal(1, es.instance_variable_get(:@record_array).size)
end
end

sub_test_case 'filter_stream' do
test 'null filter' do
null_filter = Class.new(Fluent::Filter) do |c|
def filter_stream(tag, es)
MultiEventStream.new
end
end
es = emit(null_filter, ['foo'])
assert_equal(0, es.instance_variable_get(:@record_array).size)
end

test 'pass filter' do
pass_filter = Class.new(Fluent::Filter) do |c|
def filter_stream(tag, es)
es
end
end
es = emit(pass_filter, ['foo'])
assert_equal(1, es.instance_variable_get(:@record_array).size)
end
end
end