diff --git a/test/plugin/out_forward/test_ack_handler.rb b/test/plugin/out_forward/test_ack_handler.rb new file mode 100644 index 0000000000..61c895f568 --- /dev/null +++ b/test/plugin/out_forward/test_ack_handler.rb @@ -0,0 +1,95 @@ +require_relative '../../helper' +require 'fluent/test/driver/output' +require 'flexmock/test_unit' + +require 'fluent/plugin/out_forward' +require 'fluent/plugin/out_forward/ack_handler' + +class AckHandlerTest < Test::Unit::TestCase + data( + 'chunk_id is matched' => [MessagePack.pack({ 'ack' => Base64.encode64('chunk_id 111') }), Fluent::Plugin::ForwardOutput::AckHandler::Result::SUCCESS], + 'chunk_id is not matched' => [MessagePack.pack({ 'ack' => 'unmatched' }), Fluent::Plugin::ForwardOutput::AckHandler::Result::CHUNKID_UNMATCHED], + 'chunk_id is empty' => ['', Fluent::Plugin::ForwardOutput::AckHandler::Result::FAILED], + ) + test 'returns node, sock and result status' do |args| + receved, state = args + ack_handler = Fluent::Plugin::ForwardOutput::AckHandler.new(10, log: $log, read_length: 100) + r, w = IO.pipe + + node = flexmock('node', host: '127.0.0.1', port: '1000') # for log + chunk_id = 'chunk_id 111' + ack = ack_handler.create_ack(chunk_id, node) + + w.write(chunk_id) + mock(r).recv(anything) { |_| receved } # IO does not have recv + ack.enqueue(r) + + a1 = a2 = a3 = a4 = nil + ack_handler.ack_reader(1) do |cid, n, s, ret| + # This block is rescued by ack_handler so it needs to invoke assetion outside of this block + a1 = cid; a2 = n; a3 = s; a4 = ret + end + + assert_equal chunk_id, a1 + assert_equal node, a2 + assert_equal r, a3 + assert_equal state, a4 + ensure + r.close rescue nil + w.close rescue nil + end + + test 'returns nil if raise an error' do + ack_handler = Fluent::Plugin::ForwardOutput::AckHandler.new(10, log: $log, read_length: 100) + r, w = IO.pipe + + node = flexmock('node', host: '127.0.0.1', port: '1000') # for log + chunk_id = 'chunk_id 111' + ack = ack_handler.create_ack(chunk_id, node) + + w.write(chunk_id) + mock(r).recv(anything) { |_| raise 'unexpected error' } # IO does not have recv + ack.enqueue(r) + + a1 = a2 = a3 = a4 = nil + ack_handler.ack_reader(1) do |cid, n, s, ret| + # This block is rescued by ack_handler so it needs to invoke assetion outside of this block + a1 = cid; a2 = n; a3 = s; a4 = ret + end + + assert_equal nil, a1 + assert_equal nil, a2 + assert_equal nil, a3 + assert_equal Fluent::Plugin::ForwardOutput::AckHandler::Result::FAILED, a4 + ensure + r.close rescue nil + w.close rescue nil + end + + test 'when ack is expired' do + ack_handler = Fluent::Plugin::ForwardOutput::AckHandler.new(0, log: $log, read_length: 100) + r, w = IO.pipe + + node = flexmock('node', host: '127.0.0.1', port: '1000') # for log + chunk_id = 'chunk_id 111' + ack = ack_handler.create_ack(chunk_id, node) + + w.write(chunk_id) + mock(r).recv(anything).never + ack.enqueue(r) + + a1 = a2 = a3 = a4 = nil + ack_handler.ack_reader(1) do |cid, n, s, ret| + # This block is rescued by ack_handler so it needs to invoke assetion outside of this block + a1 = cid; a2 = n; a3 = s; a4 = ret + end + + assert_equal chunk_id, a1 + assert_equal node, a2 + assert_equal r, a3 + assert_equal Fluent::Plugin::ForwardOutput::AckHandler::Result::FAILED, a4 + ensure + r.close rescue nil + w.close rescue nil + end +end