Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add receive_buffer_size parameter for in_udp for v0.12 #1788

Merged
merged 1 commit into from
Dec 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Add recv_buffer parameter for in_udp for v0.12
  • Loading branch information
Yuki Ito committed Dec 13, 2017
commit d6656e3fb95b1c30c9b59f51e10f46b61d070305
4 changes: 3 additions & 1 deletion lib/fluent/plugin/in_udp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class UdpInput < SocketUtil::BaseInput
config_param :message_length_limit, :size, default: 4096
desc "Remove newline from the end of incoming payload"
config_param :remove_newline, :bool, default: true
desc "The max size of socket receive buffer. SO_RCVBUF"
config_param :receive_buffer_size, :size, default: nil

def configure(conf)
super
Expand All @@ -39,7 +41,7 @@ def listen(callback)
log.info "listening udp socket on #{@bind}:#{@port}"
@usock = SocketUtil.create_udp_socket(@bind)
@usock.bind(@bind, @port)
SocketUtil::UdpHandler.new(@usock, log, @message_length_limit, callback, !!@source_hostname_key, @remove_newline)
SocketUtil::UdpHandler.new(@usock, log, @message_length_limit, callback, !!@source_hostname_key, @remove_newline, @receive_buffer_size)
end
end
end
5 changes: 4 additions & 1 deletion lib/fluent/plugin/socket_util.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ def create_udp_socket(host)
module_function :create_udp_socket

class UdpHandler < Coolio::IO
def initialize(io, log, body_size_limit, callback, resolve_hostname = false, remove_newline = true)
def initialize(io, log, body_size_limit, callback, resolve_hostname = false, remove_newline = true, receive_buffer_size = nil)
super(io)
if io.is_a?(UDPSocket) && receive_buffer_size
io.setsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF, receive_buffer_size)
end
@io = io
@io.do_not_reverse_lookup = !resolve_hostname
@log = log
Expand Down
34 changes: 34 additions & 0 deletions test/plugin/test_in_udp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def test_configure
assert_equal PORT, d.instance.port
assert_equal k, d.instance.bind
assert_equal 4096, d.instance.message_length_limit
assert_equal nil, d.instance.receive_buffer_size
}
end

Expand Down Expand Up @@ -124,6 +125,39 @@ def test_time_format
compare_test_result(d.emits, tests)
end

test "receive_buffer_size" do
# don't check exact value because it depends on platform and condition

# check if default socket and in_udp's one without receive_buffer_size have same size buffer
d0 = create_driver(BASE_CONFIG + %!
format none
!)
d0.run do
begin
sock = d0.instance.instance_variable_get(:@handler).instance_variable_get(:@io)
default_sock = UDPSocket.new
assert_equal(default_sock.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).int, sock.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).int)
ensure
default_sock.close
end
end

# check if default socket and in_udp's one with receive_buffer_size have different size buffer
d1 = create_driver(BASE_CONFIG + %!
format none
receive_buffer_size 1001
!)
d1.run do
sock = d1.instance.instance_variable_get(:@handler).instance_variable_get(:@io)
begin
default_sock = UDPSocket.new
assert_not_equal(default_sock.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).int, sock.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).int)
ensure
default_sock.close
end
end
end

def compare_test_result(emits, tests)
assert_equal(2, emits.size)
emits.each_index {|i|
Expand Down