diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb index 3c44a93ceb..2bfcee690d 100644 --- a/lib/fluent/plugin/in_forward.rb +++ b/lib/fluent/plugin/in_forward.rb @@ -125,7 +125,12 @@ def on_message(msg, chunk_size, source) return end - # TODO format error + # TODO: raise an exception if broken chunk is generated by recoverable situation + unless msg.is_a?(Array) + log.warn "incoming chunk is broken:", source: source, msg: msg + return + end + tag = msg[0].to_s entries = msg[1] diff --git a/test/plugin/test_in_forward.rb b/test/plugin/test_in_forward.rb index 18c0c1139b..329725f19e 100644 --- a/test/plugin/test_in_forward.rb +++ b/test/plugin/test_in_forward.rb @@ -200,6 +200,26 @@ def test_send_large_chunk_limit }.size == 1, "large chunk warning is not logged" end + data('string chunk' => 'broken string', + 'integer chunk' => 10) + def test_send_broken_chunk(data) + d = create_driver + + # d.run => send_data + d.run do + d.instance.send(:on_message, data, 1000000000, "host: 127.0.0.1, addr: 127.0.0.1, port: 0000") + end + + # check emitted data + emits = d.emits + assert_equal 0, emits.size + + # check log + assert d.instance.log.logs.select{|line| + line =~ / \[warn\]: incoming chunk is broken: source="host: 127.0.0.1, addr: 127.0.0.1, port: \d+" msg=#{data.inspect}/ + }.size == 1, "should not accept broken chunk" + end + def test_respond_to_message_requiring_ack d = create_driver