From 49ff636b15cf88bd0aa10f8f2eef2819c829332f Mon Sep 17 00:00:00 2001 From: jitran Date: Tue, 17 Jan 2017 10:43:58 +1100 Subject: [PATCH 1/4] Add a new option for in_tail called 'enable_catch_all'. When set to true, all singular and multiline log lines that don't match the parser regex will be stored in an attribute called parse_fail, and eventually saved in the output destination. --- lib/fluent/plugin/in_tail.rb | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index e8cb94dada..41aab35cad 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -65,6 +65,8 @@ def initialize config_param :read_lines_limit, :integer, default: 1000 desc 'The interval of flushing the buffer for multiline format' config_param :multiline_flush_interval, :time, default: nil + desc 'Enable the option to capture unmatched lines.' + config_param :enable_catch_all, :bool, default: false desc 'Enable the additional watch timer.' config_param :enable_watch_timer, :bool, default: true desc 'The encoding after conversion of the input.' @@ -357,6 +359,11 @@ def convert_line_to_event(line, es, tail_watcher) record[@path_key] ||= tail_watcher.path unless @path_key.nil? es.add(time, record) else + if @enable_catch_all + record = {'parse_fail' => line} + record[@path_key] ||= tail_watcher.path unless @path_key.nil? + es.add(Fluent::EventTime.now, record) + end log.warn "pattern not match: #{line.inspect}" end } @@ -387,6 +394,9 @@ def parse_multilines(lines, tail_watcher) lb = line else if lb.nil? + if @enable_catch_all + convert_line_to_event(line, es, tail_watcher) + end log.warn "got incomplete line before first line from #{tail_watcher.path}: #{line.inspect}" else lb << line From aba4b736ae7ac9a9dc433b0ca75d09c3d507a54c Mon Sep 17 00:00:00 2001 From: jitran Date: Tue, 17 Jan 2017 11:20:18 +1100 Subject: [PATCH 2/4] Use Time.now.to_i as its backwards compatible with the earlier versions of fluentd --- lib/fluent/plugin/in_tail.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 41aab35cad..44d39b463e 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -362,7 +362,7 @@ def convert_line_to_event(line, es, tail_watcher) if @enable_catch_all record = {'parse_fail' => line} record[@path_key] ||= tail_watcher.path unless @path_key.nil? - es.add(Fluent::EventTime.now, record) + es.add(Time.now.to_i, record) end log.warn "pattern not match: #{line.inspect}" end From eba4c61a25c4ea30f24a1e145398c7cb20318c21 Mon Sep 17 00:00:00 2001 From: jitran Date: Fri, 20 Jan 2017 11:24:53 +1100 Subject: [PATCH 3/4] Renamed enable_catch_all param as emit_unmatched_lines for clarity; and write tests for the new behaviour --- lib/fluent/plugin/in_tail.rb | 10 +++---- test/plugin/test_in_tail.rb | 51 ++++++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 5 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 44d39b463e..c7f36a19fe 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -65,8 +65,8 @@ def initialize config_param :read_lines_limit, :integer, default: 1000 desc 'The interval of flushing the buffer for multiline format' config_param :multiline_flush_interval, :time, default: nil - desc 'Enable the option to capture unmatched lines.' - config_param :enable_catch_all, :bool, default: false + desc 'Enable the option to emit unmatched lines.' + config_param :emit_unmatched_lines, :bool, default: false desc 'Enable the additional watch timer.' config_param :enable_watch_timer, :bool, default: true desc 'The encoding after conversion of the input.' @@ -359,8 +359,8 @@ def convert_line_to_event(line, es, tail_watcher) record[@path_key] ||= tail_watcher.path unless @path_key.nil? es.add(time, record) else - if @enable_catch_all - record = {'parse_fail' => line} + if @emit_unmatched_lines + record = {'unmatched_line' => line} record[@path_key] ||= tail_watcher.path unless @path_key.nil? es.add(Time.now.to_i, record) end @@ -394,7 +394,7 @@ def parse_multilines(lines, tail_watcher) lb = line else if lb.nil? - if @enable_catch_all + if @emit_unmatched_lines convert_line_to_event(line, es, tail_watcher) end log.warn "got incomplete line before first line from #{tail_watcher.path}: #{line.inspect}" diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 131f4ffda1..2f688131ec 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -149,6 +149,29 @@ def test_emit(data) assert_equal(1, d.emit_count) end + data(flat: config_element("", "", { "format" => "/^(?test.*)/", "emit_unmatched_lines" => true }), + parse: config_element("", "", { "format" => "/^(?test.*)/", "emit_unmatched_lines" => true })) + def test_emit_with_emit_unmatched_lines_true(data) + config = data + File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } + + d = create_driver(config) + + d.run(expect_emits: 1) do + File.open("#{TMP_DIR}/tail.txt", "ab") {|f| + f.puts "test line 1" + f.puts "test line 2" + f.puts "bad line 1" + } + end + + events = d.events + assert_equal(3, events.length) + assert_equal({"message" => "test line 1"}, events[0][2]) + assert_equal({"message" => "test line 2"}, events[1][2]) + assert_equal({"unmatched_line" => "bad line 1"}, events[2][2]) + end + data('flat 1' => [:flat, 1, 2], 'flat 10' => [:flat, 10, 1], 'parse 1' => [:parse, 1, 2], @@ -516,6 +539,34 @@ def test_multiline(data) assert_equal({"message1" => "test8"}, events[3][2]) end + data(flat: MULTILINE_CONFIG + config_element("", "", { "emit_unmatched_lines" => true }), + parse: PARSE_MULTILINE_CONFIG + config_element("", "", { "emit_unmatched_lines" => true }),) + def test_multiline_with_emit_unmatched_lines_true(data) + config = data + File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } + d = create_driver(config) + d.run(expect_emits: 1) do + File.open("#{TMP_DIR}/tail.txt", "ab") { |f| + f.puts "f test1" + f.puts "s test2" + f.puts "f test3" + f.puts "f test4" + f.puts "s test5" + f.puts "s test6" + f.puts "f test7" + f.puts "s test8" + } + end + + events = d.events + assert_equal(5, events.length) + assert_equal({"unmatched_line" => "f test1"}, events[0][2]) + assert_equal({"message1" => "test2", "message2" => "test3", "message3" => "test4"}, events[1][2]) + assert_equal({"message1" => "test5"}, events[2][2]) + assert_equal({"message1" => "test6", "message2" => "test7"}, events[3][2]) + assert_equal({"message1" => "test8"}, events[4][2]) + end + data(flat: MULTILINE_CONFIG, parse: PARSE_MULTILINE_CONFIG) def test_multiline_with_flush_interval(data) From d874db165ee32c67b22e112948189ab2924f3775 Mon Sep 17 00:00:00 2001 From: jitran Date: Wed, 25 Jan 2017 12:59:00 +1100 Subject: [PATCH 4/4] Deprecated Time.now.to_i in favor of Fluent::EventTime.now, and updated unit tests --- lib/fluent/plugin/in_tail.rb | 2 +- test/plugin/test_in_tail.rb | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index c7f36a19fe..625ed803c8 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -362,7 +362,7 @@ def convert_line_to_event(line, es, tail_watcher) if @emit_unmatched_lines record = {'unmatched_line' => line} record[@path_key] ||= tail_watcher.path unless @path_key.nil? - es.add(Time.now.to_i, record) + es.add(Fluent::EventTime.now, record) end log.warn "pattern not match: #{line.inspect}" end diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 2f688131ec..4621032949 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -149,27 +149,26 @@ def test_emit(data) assert_equal(1, d.emit_count) end - data(flat: config_element("", "", { "format" => "/^(?test.*)/", "emit_unmatched_lines" => true }), - parse: config_element("", "", { "format" => "/^(?test.*)/", "emit_unmatched_lines" => true })) - def test_emit_with_emit_unmatched_lines_true(data) - config = data + def test_emit_with_emit_unmatched_lines_true + config = config_element("", "", { "format" => "/^(?test.*)/", "emit_unmatched_lines" => true }) File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } d = create_driver(config) - d.run(expect_emits: 1) do File.open("#{TMP_DIR}/tail.txt", "ab") {|f| f.puts "test line 1" f.puts "test line 2" f.puts "bad line 1" + f.puts "test line 3" } end events = d.events - assert_equal(3, events.length) + assert_equal(4, events.length) assert_equal({"message" => "test line 1"}, events[0][2]) assert_equal({"message" => "test line 2"}, events[1][2]) assert_equal({"unmatched_line" => "bad line 1"}, events[2][2]) + assert_equal({"message" => "test line 3"}, events[3][2]) end data('flat 1' => [:flat, 1, 2], @@ -539,11 +538,12 @@ def test_multiline(data) assert_equal({"message1" => "test8"}, events[3][2]) end - data(flat: MULTILINE_CONFIG + config_element("", "", { "emit_unmatched_lines" => true }), - parse: PARSE_MULTILINE_CONFIG + config_element("", "", { "emit_unmatched_lines" => true }),) + data(flat: MULTILINE_CONFIG, + parse: PARSE_MULTILINE_CONFIG) def test_multiline_with_emit_unmatched_lines_true(data) - config = data + config = data + config_element("", "", { "emit_unmatched_lines" => true }) File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } + d = create_driver(config) d.run(expect_emits: 1) do File.open("#{TMP_DIR}/tail.txt", "ab") { |f|