diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index 4acfd3178d..53edec4461 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -39,6 +39,8 @@ class ConnectionClosedError < Error; end desc 'The timeout time when sending event logs.' config_param :send_timeout, :time, default: 60 + desc 'The timeout time for socket connect' + config_param :connect_timeout, :time, default: nil # TODO: add linger_timeout, recv_timeout desc 'The protocol to use for heartbeats (default is the same with "transport").' @@ -376,6 +378,7 @@ def create_transfer_socket(host, port, hostname, &block) linger_timeout: Fluent.windows? ? nil : @send_timeout, send_timeout: @send_timeout, recv_timeout: @ack_response_timeout, + connect_timeout: @connect_timeout, &block ) when :tcp @@ -384,6 +387,7 @@ def create_transfer_socket(host, port, hostname, &block) linger_timeout: @send_timeout, send_timeout: @send_timeout, recv_timeout: @ack_response_timeout, + connect_timeout: @connect_timeout, &block ) else diff --git a/lib/fluent/plugin_helper/socket.rb b/lib/fluent/plugin_helper/socket.rb index 09ffd3168f..f38e6972ea 100644 --- a/lib/fluent/plugin_helper/socket.rb +++ b/lib/fluent/plugin_helper/socket.rb @@ -55,8 +55,10 @@ def socket_create(proto, host, port, **kwargs, &block) end end - def socket_create_tcp(host, port, resolve_name: false, **kwargs, &block) - sock = WrappedSocket::TCP.new(host, port) + def socket_create_tcp(host, port, resolve_name: false, connect_timeout: nil, **kwargs, &block) + sock = ::Socket.tcp(host, port, connect_timeout: connect_timeout) + sock.autoclose = false # avoid GC triggered close + sock = WrappedSocket::TCP.for_fd(sock.fileno) socket_option_set(sock, resolve_name: resolve_name, **kwargs) if block begin diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index bddc08dff0..896429da74 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -78,6 +78,7 @@ def read_ack_from_sock(sock, unpacker) assert_equal 60, d.instance.send_timeout assert_equal :transport, d.instance.heartbeat_type assert_equal 1, nodes.length + assert_nil d.instance.connect_timeout node = nodes.first assert_equal "test", node.name assert_equal '127.0.0.1', node.host @@ -102,6 +103,23 @@ def read_ack_from_sock(sock, unpacker) assert_equal( 10*1024*1024, instance.buffer.chunk_limit_size ) end + test 'configure timeouts' do + @d = d = create_driver(%[ + send_timeout 30 + connect_timeout 10 + hard_timeout 15 + ack_response_timeout 20 + + host #{TARGET_HOST} + port #{TARGET_PORT} + + ]) + assert_equal 30, d.instance.send_timeout + assert_equal 10, d.instance.connect_timeout + assert_equal 15, d.instance.hard_timeout + assert_equal 20, d.instance.ack_response_timeout + end + test 'configure_udp_heartbeat' do @d = d = create_driver(CONFIG + "\nheartbeat_type udp") assert_equal :udp, d.instance.heartbeat_type