diff --git a/lib/fluent/plugin/in_udp.rb b/lib/fluent/plugin/in_udp.rb index 14a71c1bcf..d8ffa00773 100644 --- a/lib/fluent/plugin/in_udp.rb +++ b/lib/fluent/plugin/in_udp.rb @@ -28,6 +28,8 @@ class UdpInput < SocketUtil::BaseInput config_param :message_length_limit, :size, default: 4096 desc "Remove newline from the end of incoming payload" config_param :remove_newline, :bool, default: true + desc "The max size of socket receive buffer. SO_RCVBUF" + config_param :receive_buffer_size, :size, default: nil def configure(conf) super @@ -39,7 +41,7 @@ def listen(callback) log.info "listening udp socket on #{@bind}:#{@port}" @usock = SocketUtil.create_udp_socket(@bind) @usock.bind(@bind, @port) - SocketUtil::UdpHandler.new(@usock, log, @message_length_limit, callback, !!@source_hostname_key, @remove_newline) + SocketUtil::UdpHandler.new(@usock, log, @message_length_limit, callback, !!@source_hostname_key, @remove_newline, @receive_buffer_size) end end end diff --git a/lib/fluent/plugin/socket_util.rb b/lib/fluent/plugin/socket_util.rb index d1890a3684..f52dcd8db3 100644 --- a/lib/fluent/plugin/socket_util.rb +++ b/lib/fluent/plugin/socket_util.rb @@ -32,8 +32,11 @@ def create_udp_socket(host) module_function :create_udp_socket class UdpHandler < Coolio::IO - def initialize(io, log, body_size_limit, callback, resolve_hostname = false, remove_newline = true) + def initialize(io, log, body_size_limit, callback, resolve_hostname = false, remove_newline = true, receive_buffer_size = nil) super(io) + if io.is_a?(UDPSocket) && receive_buffer_size + io.setsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF, receive_buffer_size) + end @io = io @io.do_not_reverse_lookup = !resolve_hostname @log = log diff --git a/test/plugin/test_in_udp.rb b/test/plugin/test_in_udp.rb index 813fd91a11..171e8ff3d7 100755 --- a/test/plugin/test_in_udp.rb +++ b/test/plugin/test_in_udp.rb @@ -33,6 +33,7 @@ def test_configure assert_equal PORT, d.instance.port assert_equal k, d.instance.bind assert_equal 4096, d.instance.message_length_limit + assert_equal nil, d.instance.receive_buffer_size } end @@ -124,6 +125,39 @@ def test_time_format compare_test_result(d.emits, tests) end + test "receive_buffer_size" do + # don't check exact value because it depends on platform and condition + + # check if default socket and in_udp's one without receive_buffer_size have same size buffer + d0 = create_driver(BASE_CONFIG + %! + format none + !) + d0.run do + begin + sock = d0.instance.instance_variable_get(:@handler).instance_variable_get(:@io) + default_sock = UDPSocket.new + assert_equal(default_sock.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).int, sock.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).int) + ensure + default_sock.close + end + end + + # check if default socket and in_udp's one with receive_buffer_size have different size buffer + d1 = create_driver(BASE_CONFIG + %! + format none + receive_buffer_size 1001 + !) + d1.run do + sock = d1.instance.instance_variable_get(:@handler).instance_variable_get(:@io) + begin + default_sock = UDPSocket.new + assert_not_equal(default_sock.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).int, sock.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).int) + ensure + default_sock.close + end + end + end + def compare_test_result(emits, tests) assert_equal(2, emits.size) emits.each_index {|i|