Skip to content

Commit

Permalink
Merge pull request #611 from fluent/check-message-type-in-forward
Browse files Browse the repository at this point in the history
Add chunk format check in in_forward
  • Loading branch information
repeatedly committed Jun 12, 2015
2 parents 7bd529c + d39c532 commit 4e844ae
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
7 changes: 6 additions & 1 deletion lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,12 @@ def on_message(msg, chunk_size, source)
return
end

# TODO format error
# TODO: raise an exception if broken chunk is generated by recoverable situation
unless msg.is_a?(Array)
log.warn "incoming chunk is broken:", source: source, msg: msg
return
end

tag = msg[0].to_s
entries = msg[1]

Expand Down
20 changes: 20 additions & 0 deletions test/plugin/test_in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,26 @@ def test_send_large_chunk_limit
}.size == 1, "large chunk warning is not logged"
end

data('string chunk' => 'broken string',
'integer chunk' => 10)
def test_send_broken_chunk(data)
d = create_driver

# d.run => send_data
d.run do
d.instance.send(:on_message, data, 1000000000, "host: 127.0.0.1, addr: 127.0.0.1, port: 0000")
end

# check emitted data
emits = d.emits
assert_equal 0, emits.size

# check log
assert d.instance.log.logs.select{|line|
line =~ / \[warn\]: incoming chunk is broken: source="host: 127.0.0.1, addr: 127.0.0.1, port: \d+" msg=#{data.inspect}/
}.size == 1, "should not accept broken chunk"
end

def test_respond_to_message_requiring_ack
d = create_driver

Expand Down

0 comments on commit 4e844ae

Please sign in to comment.