From 98ca682f787767cfe573ab6eb39c23a99732c71c Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Wed, 17 Feb 2016 09:13:15 +0900 Subject: [PATCH 1/2] in_forward: Add source_host_key parameter. fix #804 --- lib/fluent/plugin/in_forward.rb | 46 +++++++++++++++++-------- test/plugin/test_in_forward.rb | 60 +++++++++++++++++++++++++++++---- 2 files changed, 86 insertions(+), 20 deletions(-) diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb index ca0a2a7191..b6b7d1370e 100644 --- a/lib/fluent/plugin/in_forward.rb +++ b/lib/fluent/plugin/in_forward.rb @@ -49,6 +49,8 @@ def initialize config_param :chunk_size_limit, :size, default: nil desc 'Skip an event if incoming event is invalid.' config_param :skip_invalid_event, :bool, default: false + desc "The field name of the client's hostname." + config_param :source_host_key, :string, default: nil def configure(conf) super @@ -145,7 +147,7 @@ def run # 3: object record # 4: object option (optional) # } - def on_message(msg, chunk_size, source) + def on_message(msg, chunk_size, peeraddr) if msg.nil? # for future TCP heartbeat_request return @@ -153,7 +155,7 @@ def on_message(msg, chunk_size, source) # TODO: raise an exception if broken chunk is generated by recoverable situation unless msg.is_a?(Array) - log.warn "incoming chunk is broken:", source: source, msg: msg + log.warn "incoming chunk is broken:", source: source_message(peeraddr), msg: msg return end @@ -161,10 +163,10 @@ def on_message(msg, chunk_size, source) entries = msg[1] if @chunk_size_limit && (chunk_size > @chunk_size_limit) - log.warn "Input chunk size is larger than 'chunk_size_limit', dropped:", tag: tag, source: source, limit: @chunk_size_limit, size: chunk_size + log.warn "Input chunk size is larger than 'chunk_size_limit', dropped:", tag: tag, source: source_message(peeraddr), limit: @chunk_size_limit, size: chunk_size return elsif @chunk_size_warn_limit && (chunk_size > @chunk_size_warn_limit) - log.warn "Input chunk size is larger than 'chunk_size_warn_limit':", tag: tag, source: source, limit: @chunk_size_warn_limit, size: chunk_size + log.warn "Input chunk size is larger than 'chunk_size_warn_limit':", tag: tag, source: source_message(peeraddr), limit: @chunk_size_warn_limit, size: chunk_size end if entries.class == String @@ -172,13 +174,14 @@ def on_message(msg, chunk_size, source) option = msg[2] size = (option && option['size']) || 0 es = MessagePackEventStream.new(entries, nil, size.to_i) - es = check_and_skip_invalid_event(tag, es, source) if @skip_invalid_event + es = check_and_skip_invalid_event(tag, es, peeraddr) if @skip_invalid_event + es = add_source_host(es, peeraddr[2]) if @source_host_key router.emit_stream(tag, es) elsif entries.class == Array # Forward es = if @skip_invalid_event - check_and_skip_invalid_event(tag, entries, source) + check_and_skip_invalid_event(tag, entries, peeraddr) else es = MultiEventStream.new entries.each { |e| @@ -190,6 +193,7 @@ def on_message(msg, chunk_size, source) } es end + es = add_source_host(es, peeraddr[2]) if @source_host_key router.emit_stream(tag, es) option = msg[2] @@ -198,11 +202,12 @@ def on_message(msg, chunk_size, source) time = msg[1] record = msg[2] if @skip_invalid_event && invalid_event?(tag, time, record) - log.warn "got invalid event and drop it:", source: source, tag: tag, time: time, record: record + log.warn "got invalid event and drop it:", source: source_message(peeraddr), tag: tag, time: time, record: record return msg[3] # retry never succeeded so return ack and drop incoming event. end return if record.nil? time = Engine.now if time.to_i == 0 + record[@source_host_key] = peeraddr[2] if @source_host_key router.emit(tag, time, record) option = msg[3] end @@ -215,11 +220,11 @@ def invalid_event?(tag, time, record) !((time.is_a?(Integer) || time.is_a?(::Fluent::EventTime)) && record.is_a?(Hash) && tag.is_a?(String)) end - def check_and_skip_invalid_event(tag, es, source) + def check_and_skip_invalid_event(tag, es, peeraddr) new_es = MultiEventStream.new es.each { |time, record| if invalid_event?(tag, time, record) - log.warn "skip invalid event:", source: source, tag: tag, time: time, record: record + log.warn "skip invalid event:", source: source_message(peeraddr), tag: tag, time: time, record: record next end new_es.add(time, record) @@ -227,16 +232,29 @@ def check_and_skip_invalid_event(tag, es, source) new_es end + def add_source_host(es, host) + new_es = MultiEventStream.new + es.each { |time, record| + record[@source_host_key] = host + new_es.add(time, record) + } + new_es + end + + def source_message(peeraddr) + _, port, host, addr = peeraddr + "host: #{host}, addr: #{addr}, port: #{port}" + end + class Handler < Coolio::Socket PEERADDR_FAILED = ["?", "?", "name resolusion failed", "?"] def initialize(io, linger_timeout, log, on_message) super(io) + @peeraddr = nil if io.is_a?(TCPSocket) # for unix domain socket support in the future - _proto, port, host, addr = ( io.peeraddr rescue PEERADDR_FAILED ) - @source = "host: #{host}, addr: #{addr}, port: #{port}" - + @peeraddr = (io.peeraddr rescue PEERADDR_FAILED) opt = [1, linger_timeout].pack('I!I!') # { int l_onoff; int l_linger; } io.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) end @@ -265,7 +283,7 @@ def on_read(data) @serializer = :to_json.to_proc @y = Yajl::Parser.new @y.on_parse_complete = lambda { |obj| - option = @on_message.call(obj, @chunk_counter, @source) + option = @on_message.call(obj, @chunk_counter, @peeraddr) respond option @chunk_counter = 0 } @@ -293,7 +311,7 @@ def on_read_json(data) def on_read_msgpack(data) @chunk_counter += data.bytesize @u.feed_each(data) do |obj| - option = @on_message.call(obj, @chunk_counter, @source) + option = @on_message.call(obj, @chunk_counter, @peeraddr) respond option if option @chunk_counter = 0 end diff --git a/test/plugin/test_in_forward.rb b/test/plugin/test_in_forward.rb index 815b69556d..98eacaa45e 100644 --- a/test/plugin/test_in_forward.rb +++ b/test/plugin/test_in_forward.rb @@ -29,6 +29,7 @@ def setup port #{PORT} bind 127.0.0.1 ] + PEERADDR = ['?', '0000', '127.0.0.1', '127.0.0.1'] def create_driver(conf=CONFIG) Fluent::Test::InputTestDriver.new(Fluent::ForwardInput).configure(conf) @@ -260,7 +261,7 @@ def test_set_size_to_option d.run do Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj| - option = d.instance.send(:on_message, obj, chunk.size, "host: 127.0.0.1, addr: 127.0.0.1, port: 0000") + option = d.instance.send(:on_message, obj, chunk.size, PEERADDR) assert_equal option['size'], events.length end end @@ -282,7 +283,7 @@ def test_send_large_chunk_warning d.run do Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj| - d.instance.send(:on_message, obj, chunk.size, "host: 127.0.0.1, addr: 127.0.0.1, port: 0000") + d.instance.send(:on_message, obj, chunk.size, PEERADDR) end end @@ -311,7 +312,7 @@ def test_send_large_chunk_only_warning d.run do Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj| - d.instance.send(:on_message, obj, chunk.size, "host: 127.0.0.1, addr: 127.0.0.1, port: 0000") + d.instance.send(:on_message, obj, chunk.size, PEERADDR) end end @@ -338,7 +339,7 @@ def test_send_large_chunk_limit # d.run => send_data d.run do Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj| - d.instance.send(:on_message, obj, chunk.size, "host: 127.0.0.1, addr: 127.0.0.1, port: 0000") + d.instance.send(:on_message, obj, chunk.size, PEERADDR) end end @@ -360,7 +361,7 @@ def test_send_broken_chunk(data) # d.run => send_data d.run do - d.instance.send(:on_message, data, 1000000000, "host: 127.0.0.1, addr: 127.0.0.1, port: 0000") + d.instance.send(:on_message, data, 1000000000, PEERADDR) end # check emitted data @@ -476,7 +477,6 @@ def test_respond_to_message_json_requiring_ack assert_equal events, d.emits assert_equal expected_acks, @responses.map { |res| JSON.parse(res)['ack'] } - end def test_not_respond_to_message_not_requiring_ack @@ -583,5 +583,53 @@ def send_data(data, try_to_receive_response=false, response_timeout=1) @responses << res if try_to_receive_response end + # TODO: Use sub_test_case. Currently Errno::EADDRINUSE happens inside sub_test_case + test 'message protocol with source_host_key' do + execute_test { |events| + events.each { |tag, time, record| + send_data [tag, time, record].to_msgpack + } + } + end + + test 'forward protocol with source_host_key' do + execute_test { |events| + entries = [] + events.each {|tag,time,record| + entries << [time, record] + } + send_data ['tag1', entries].to_msgpack + } + end + + test 'packed forward protocol with source_host_key' do + execute_test { |events| + entries = '' + events.each { |tag, time, record| + Fluent::Engine.msgpack_factory.packer(entries).write([time, record]).flush + } + send_data Fluent::Engine.msgpack_factory.packer.write(["tag1", entries]).to_s + } + end + + def execute_test(&block) + d = create_driver(CONFIG + 'source_host_key source') + + time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") + events = [ + ["tag1", time, {"a"=>1}], + ["tag1", time, {"a"=>2}] + ] + d.expected_emits_length = events.length + + d.run do + block.call(events) + end + + d.emits.each { |tag, _time, record| + assert_true record.has_key?('source') + } + end + # TODO heartbeat end From 180d5f502a08e0195338b71cd3ccce9e568a0fa0 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Wed, 10 Aug 2016 17:55:10 +0900 Subject: [PATCH 2/2] Rename to clear parameter name --- lib/fluent/plugin/in_forward.rb | 10 +++++----- test/plugin/test_in_forward.rb | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb index b6b7d1370e..daf75fd92b 100644 --- a/lib/fluent/plugin/in_forward.rb +++ b/lib/fluent/plugin/in_forward.rb @@ -50,7 +50,7 @@ def initialize desc 'Skip an event if incoming event is invalid.' config_param :skip_invalid_event, :bool, default: false desc "The field name of the client's hostname." - config_param :source_host_key, :string, default: nil + config_param :source_hostname_key, :string, default: nil def configure(conf) super @@ -175,7 +175,7 @@ def on_message(msg, chunk_size, peeraddr) size = (option && option['size']) || 0 es = MessagePackEventStream.new(entries, nil, size.to_i) es = check_and_skip_invalid_event(tag, es, peeraddr) if @skip_invalid_event - es = add_source_host(es, peeraddr[2]) if @source_host_key + es = add_source_host(es, peeraddr[2]) if @source_hostname_key router.emit_stream(tag, es) elsif entries.class == Array @@ -193,7 +193,7 @@ def on_message(msg, chunk_size, peeraddr) } es end - es = add_source_host(es, peeraddr[2]) if @source_host_key + es = add_source_host(es, peeraddr[2]) if @source_hostname_key router.emit_stream(tag, es) option = msg[2] @@ -207,7 +207,7 @@ def on_message(msg, chunk_size, peeraddr) end return if record.nil? time = Engine.now if time.to_i == 0 - record[@source_host_key] = peeraddr[2] if @source_host_key + record[@source_hostname_key] = peeraddr[2] if @source_hostname_key router.emit(tag, time, record) option = msg[3] end @@ -235,7 +235,7 @@ def check_and_skip_invalid_event(tag, es, peeraddr) def add_source_host(es, host) new_es = MultiEventStream.new es.each { |time, record| - record[@source_host_key] = host + record[@source_hostname_key] = host new_es.add(time, record) } new_es diff --git a/test/plugin/test_in_forward.rb b/test/plugin/test_in_forward.rb index 98eacaa45e..b765946db4 100644 --- a/test/plugin/test_in_forward.rb +++ b/test/plugin/test_in_forward.rb @@ -584,7 +584,7 @@ def send_data(data, try_to_receive_response=false, response_timeout=1) end # TODO: Use sub_test_case. Currently Errno::EADDRINUSE happens inside sub_test_case - test 'message protocol with source_host_key' do + test 'message protocol with source_hostname_key' do execute_test { |events| events.each { |tag, time, record| send_data [tag, time, record].to_msgpack @@ -592,7 +592,7 @@ def send_data(data, try_to_receive_response=false, response_timeout=1) } end - test 'forward protocol with source_host_key' do + test 'forward protocol with source_hostname_key' do execute_test { |events| entries = [] events.each {|tag,time,record| @@ -602,7 +602,7 @@ def send_data(data, try_to_receive_response=false, response_timeout=1) } end - test 'packed forward protocol with source_host_key' do + test 'packed forward protocol with source_hostname_key' do execute_test { |events| entries = '' events.each { |tag, time, record| @@ -613,7 +613,7 @@ def send_data(data, try_to_receive_response=false, response_timeout=1) end def execute_test(&block) - d = create_driver(CONFIG + 'source_host_key source') + d = create_driver(CONFIG + 'source_hostname_key source') time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") events = [