diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index ef920c081d..914617b3e4 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -64,14 +64,10 @@ def initialize config_param :multiline_flush_interval, :time, default: nil desc 'Enable the additional watch timer.' config_param :enable_watch_timer, :bool, default: true + desc 'The encoding after conversion of the input.' + config_param :encoding, :string, default: nil desc 'The encoding of the input.' - config_param :encoding, default: nil do |encoding_name| - begin - Encoding.find(encoding_name) - rescue ArgumentError => e - raise ConfigError, e.message - end - end + config_param :from_encoding, :string, default: nil desc 'Add the log path being tailed to records. Specify the field name to be used.' config_param :path_key, :string, default: nil @@ -92,6 +88,7 @@ def configure(conf) configure_parser(conf) configure_tag + configure_encoding @multiline_mode = conf['format'] =~ /multiline/ @receive_handler = if @multiline_mode @@ -117,6 +114,25 @@ def configure_tag end end + def configure_encoding + unless @encoding + if @from_encoding + raise ConfigError, "tail: 'from_encoding' parameter must be specified with 'encoding' parameter." + end + end + + @encoding = parse_encoding_param(@encoding) if @encoding + @from_encoding = parse_encoding_param(@from_encoding) if @from_encoding + end + + def parse_encoding_param(encoding_name) + begin + Encoding.find(encoding_name) if encoding_name + rescue ArgumentError => e + raise ConfigError, e.message + end + end + def start super @@ -251,7 +267,13 @@ def close_watcher_after_rotate_wait(tw) def flush_buffer(tw) if lb = tw.line_buffer lb.chomp! - lb.force_encoding(@encoding) if @encoding + if @encoding + if @from_encoding + lb.encode!(@encoding, @from_encoding) + else + lb.force_encoding(@encoding) + end + end @parser.parse(lb) { |time, record| if time && record tag = if @tag_prefix || @tag_suffix @@ -300,7 +322,13 @@ def receive_lines(lines, tail_watcher) def convert_line_to_event(line, es, tail_watcher) begin line.chomp! # remove \n - line.force_encoding(@encoding) if @encoding + if @encoding + if @from_encoding + line.encode!(@encoding, @from_encoding) + else + line.force_encoding(@encoding) + end + end @parser.parse(line) { |time, record| if time && record record[@path_key] ||= tail_watcher.path unless @path_key.nil? diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 385ead6f16..872f0fdfde 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -71,6 +71,32 @@ def test_configure_encoding end end + def test_configure_from_encoding + # If only specified from_encoding raise ConfigError + assert_raise(Fluent::ConfigError) do + d = create_driver(SINGLE_LINE_CONFIG + 'from_encoding utf-8') + end + + # valid setting + d = create_driver %[ + format /(?.*)/ + read_from_head true + from_encoding utf-8 + encoding utf-8 + ] + assert_equal Encoding::UTF_8, d.instance.from_encoding + + # invalid from_encoding + assert_raise(Fluent::ConfigError) do + d = create_driver %[ + format /(?.*)/ + read_from_head true + from_encoding no-such-encoding + encoding utf-8 + ] + end + end + # TODO: Should using more better approach instead of sleep wait def test_emit @@ -403,6 +429,28 @@ def test_encoding(data) assert_equal(encoding, emits[0][2]['message'].encoding) end + def test_from_encoding + d = create_driver %[ + format /(?.*)/ + read_from_head true + from_encoding cp932 + encoding utf-8 + ] + + d.run do + sleep 1 + + File.open("#{TMP_DIR}/tail.txt", "w:cp932") {|f| + f.puts "\x82\xCD\x82\xEB\x81\x5B\x82\xED\x81\x5B\x82\xE9\x82\xC7".force_encoding(Encoding::CP932) + } + sleep 1 + end + + emits = d.emits + assert_equal("\x82\xCD\x82\xEB\x81\x5B\x82\xED\x81\x5B\x82\xE9\x82\xC7".force_encoding(Encoding::CP932).encode(Encoding::UTF_8), emits[0][2]['message']) + assert_equal(Encoding::UTF_8, emits[0][2]['message'].encoding) + end + # multiline mode test def test_multiline @@ -507,6 +555,31 @@ def test_multiline_encoding_of_flushed_record(data) end end + def test_multiline_from_encoding_of_flushed_record + d = create_driver %[ + format multiline + format1 /^s (?[^\\n]+)(\\nf (?[^\\n]+))?(\\nf (?.*))?/ + format_firstline /^[s]/ + multiline_flush_interval 2s + read_from_head true + from_encoding cp932 + encoding utf-8 + ] + + d.run do + sleep 1 + File.open("#{TMP_DIR}/tail.txt", "w:cp932") { |f| + f.puts "s \x82\xCD\x82\xEB\x81\x5B\x82\xED\x81\x5B\x82\xE9\x82\xC7".force_encoding(Encoding::CP932) + } + + sleep 4 + emits = d.emits + assert_equal(1, emits.length) + assert_equal("\x82\xCD\x82\xEB\x81\x5B\x82\xED\x81\x5B\x82\xE9\x82\xC7".force_encoding(Encoding::CP932).encode(Encoding::UTF_8), emits[0][2]['message1']) + assert_equal(Encoding::UTF_8, emits[0][2]['message1'].encoding) + end + end + def test_multiline_with_multiple_formats File.open("#{TMP_DIR}/tail.txt", "wb") { |f| }