diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb index d89cef55dc..4cc3323369 100644 --- a/lib/fluent/plugin/in_forward.rb +++ b/lib/fluent/plugin/in_forward.rb @@ -20,10 +20,6 @@ require 'yajl' require 'digest' -require 'fluent/plugin/socket_util' -require 'fcntl' -require 'cool.io' - module Fluent::Plugin class ForwardInput < Input Fluent::Plugin.register_input('forward', self) @@ -31,7 +27,7 @@ class ForwardInput < Input # See the wiki page below for protocol specification # https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1 - helpers :event_loop + helpers :server LISTEN_PORT = 24224 @@ -39,12 +35,15 @@ class ForwardInput < Input config_param :port, :integer, default: LISTEN_PORT desc 'The bind address to listen to.' config_param :bind, :string, default: '0.0.0.0' + config_param :backlog, :integer, default: nil # SO_LINGER 0 to send RST rather than FIN to avoid lots of connections sitting in TIME_WAIT at src desc 'The timeout time used to set linger option.' config_param :linger_timeout, :integer, default: 0 # This option is for Cool.io's loop wait timeout to avoid loop stuck at shutdown. Almost users don't need to change this value. config_param :blocking_timeout, :time, default: 0.5 + desc 'Try to resolve hostname from IP addresses or not.' + config_param :resolve_hostname, :bool, default: nil desc 'Connections will be disconnected right after receiving first message if this value is true.' config_param :deny_keepalive, :bool, default: false @@ -91,6 +90,15 @@ class ForwardInput < Input def configure(conf) super + if @source_hostname_key + # TODO: add test + if @resolve_hostname.nil? + @resolve_hostname = true + elsif !@resolve_hostname # user specifies "false" in config + raise Fluent::ConfigError, "resolve_hostname must be true with source_hostname_key" + end + end + if @security if @security.user_auth && @security.users.empty? raise Fluent::ConfigError, " sections required if user_auth enabled" @@ -131,44 +139,35 @@ def configure(conf) @lsock = @usock = nil end + HEARTBEAT_UDP_PAYLOAD = "\0" + def start super - socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH'] - if Fluent.windows? - socket_manager_path = socket_manager_path.to_i + server_create_connection( + :in_forward_server, @port, + bind: @bind, + shared: false, + resolve_name: @resolve_hostname, + linger_timeout: @linger_timeout, + backlog: @backlog, + &method(:handle_connection) + ) + + server_create(:in_forward_server_udp_heartbeat, @port, shared: false, proto: :udp, bind: @bind, resolve_name: @resolve_hostname, max_bytes: 128) do |data, sock| + log.trace "heartbeat udp data arrived", host: sock.remote_host, port: sock.remote_port, data: data + begin + sock.write HEARTBEAT_UDP_PAYLOAD + rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR + log.trace "error while heartbeat response", host: sock.remote_host, error: e + end end - client = ServerEngine::SocketManager::Client.new(socket_manager_path) - - @lsock = listen(client) - event_loop_attach(@lsock) - - @usock = client.listen_udp(@bind, @port) - @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) - @hbr = HeartbeatRequestHandler.new(@usock, method(:on_heartbeat_request)) - event_loop_attach(@hbr) - end - - def close - @lsock.close if @lsock - @usock.close if @usock - super end - def listen(client) - log.info "listening fluent socket on #{@bind}:#{@port}" - sock = client.listen_tcp(@bind, @port) - s = Coolio::TCPServer.new(sock, nil, Handler, @linger_timeout, log, method(:handle_connection)) - s.listen(@backlog) unless @backlog.nil? - s - end - - private - def handle_connection(conn) send_data = ->(serializer, data){ conn.write serializer.call(data) } - log.trace "connected fluent socket", address: conn.remote_addr, port: conn.remote_port + log.trace "connected fluent socket", addr: conn.remote_addr, port: conn.remote_port state = :established nonce = nil user_auth_salt = nil @@ -182,7 +181,7 @@ def handle_connection(conn) state = :pingpong end - log.trace "accepted fluent socket", address: conn.remote_addr, port: conn.remote_port + log.trace "accepted fluent socket", addr: conn.remote_addr, port: conn.remote_port read_messages(conn) do |msg, chunk_size, serializer| case state @@ -198,15 +197,11 @@ def handle_connection(conn) log.debug "connection established", address: conn.remote_addr, port: conn.remote_port state = :established when :established - options = on_message(msg, chunk_size, conn.remote_addr) + options = on_message(msg, chunk_size, conn.remote_host) if options && r = response(options) - send_data.call(serializer, r) log.trace "sent response to fluent socket", address: conn.remote_addr, response: r - if @deny_keepalive - conn.on_write_complete do - conn.close - end - end + conn.on_write_complete{ conn.close } if @deny_keepalive + send_data.call(serializer, r) else if @deny_keepalive conn.close @@ -222,7 +217,7 @@ def read_messages(conn, &block) feeder = nil serializer = nil bytes = 0 - conn.on_data do |data| + conn.data do |data| # only for first call of callback unless feeder first = data[0] @@ -258,7 +253,7 @@ def response(option) nil end - def on_message(msg, chunk_size, peeraddr) + def on_message(msg, chunk_size, remote_host) if msg.nil? # for future TCP heartbeat_request return @@ -266,7 +261,7 @@ def on_message(msg, chunk_size, peeraddr) # TODO: raise an exception if broken chunk is generated by recoverable situation unless msg.is_a?(Array) - log.warn "incoming chunk is broken:", source: source_message(peeraddr), msg: msg + log.warn "incoming chunk is broken:", host: remote_host, msg: msg return end @@ -274,10 +269,10 @@ def on_message(msg, chunk_size, peeraddr) entries = msg[1] if @chunk_size_limit && (chunk_size > @chunk_size_limit) - log.warn "Input chunk size is larger than 'chunk_size_limit', dropped:", tag: tag, source: source_message(peeraddr), limit: @chunk_size_limit, size: chunk_size + log.warn "Input chunk size is larger than 'chunk_size_limit', dropped:", tag: tag, host: remote_host, limit: @chunk_size_limit, size: chunk_size return elsif @chunk_size_warn_limit && (chunk_size > @chunk_size_warn_limit) - log.warn "Input chunk size is larger than 'chunk_size_warn_limit':", tag: tag, source: source_message(peeraddr), limit: @chunk_size_warn_limit, size: chunk_size + log.warn "Input chunk size is larger than 'chunk_size_warn_limit':", tag: tag, host: remote_host, limit: @chunk_size_warn_limit, size: chunk_size end case entries @@ -287,14 +282,14 @@ def on_message(msg, chunk_size, peeraddr) size = (option && option['size']) || 0 es_class = (option && option['compressed'] == 'gzip') ? Fluent::CompressedMessagePackEventStream : Fluent::MessagePackEventStream es = es_class.new(entries, nil, size.to_i) - es = check_and_skip_invalid_event(tag, es, peeraddr) if @skip_invalid_event - es = add_source_host(es, peeraddr[2]) if @source_hostname_key + es = check_and_skip_invalid_event(tag, es, remote_host) if @skip_invalid_event + es = add_source_host(es, remote_host) if @source_hostname_key router.emit_stream(tag, es) when Array # Forward es = if @skip_invalid_event - check_and_skip_invalid_event(tag, entries, peeraddr) + check_and_skip_invalid_event(tag, entries, remote_host) else es = Fluent::MultiEventStream.new entries.each { |e| @@ -306,7 +301,7 @@ def on_message(msg, chunk_size, peeraddr) } es end - es = add_source_host(es, peeraddr[2]) if @source_hostname_key + es = add_source_host(es, remote_host) if @source_hostname_key router.emit_stream(tag, es) option = msg[2] @@ -315,29 +310,31 @@ def on_message(msg, chunk_size, peeraddr) time = msg[1] record = msg[2] if @skip_invalid_event && invalid_event?(tag, time, record) - log.warn "got invalid event and drop it:", source: source_message(peeraddr), tag: tag, time: time, record: record + log.warn "got invalid event and drop it:", host: remote_host, tag: tag, time: time, record: record return msg[3] # retry never succeeded so return ack and drop incoming event. end return if record.nil? time = Fluent::Engine.now if time.to_i == 0 - record[@source_hostname_key] = peeraddr[2] if @source_hostname_key + record[@source_hostname_key] = remote_host if @source_hostname_key router.emit(tag, time, record) option = msg[3] end # return option for response option + ensure + p(here: "ensure of on_message", error: $!) if $! end def invalid_event?(tag, time, record) !((time.is_a?(Integer) || time.is_a?(::Fluent::EventTime)) && record.is_a?(Hash) && tag.is_a?(String)) end - def check_and_skip_invalid_event(tag, es, peeraddr) + def check_and_skip_invalid_event(tag, es, remote_host) new_es = Fluent::MultiEventStream.new es.each { |time, record| if invalid_event?(tag, time, record) - log.warn "skip invalid event:", source: source_message(peeraddr), tag: tag, time: time, record: record + log.warn "skip invalid event:", host: remote_host, tag: tag, time: time, record: record next end new_es.add(time, record) @@ -354,11 +351,6 @@ def add_source_host(es, host) new_es end - def source_message(peeraddr) - _, port, host, addr = peeraddr - "host: #{host}, addr: #{addr}, port: #{port}" - end - def select_authenticate_users(node, username) if node.nil? || node[:users].empty? @security.users.select{|u| u.username == username} @@ -424,114 +416,5 @@ def generate_pong(auth_result, reason_or_salt, nonce, shared_key) shared_key_digest_hex = Digest::SHA512.new.update(reason_or_salt).update(@security.self_hostname).update(nonce).update(shared_key).hexdigest ['PONG', true, '', @security.self_hostname, shared_key_digest_hex] end - - class Handler < Coolio::Socket - attr_reader :protocol, :remote_port, :remote_addr, :remote_host - - PEERADDR_FAILED = ["?", "?", "name resolusion failed", "?"] - - def initialize(io, linger_timeout, log, on_connect_callback) - super(io) - - @peeraddr = nil - if io.is_a?(TCPSocket) # for unix domain socket support in the future - @peeraddr = (io.peeraddr rescue PEERADDR_FAILED) - opt = [1, linger_timeout].pack('I!I!') # { int l_onoff; int l_linger; } - io.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) - end - - ### TODO: disabling name rev resolv - proto, port, host, addr = ( io.peeraddr rescue PEERADDR_FAILED ) - if addr == '?' - port, addr = *Socket.unpack_sockaddr_in(io.getpeername) rescue nil - end - @protocol = proto - @remote_port = port - @remote_addr = addr - @remote_host = host - @writing = false - @closing = false - @mutex = Mutex.new - - @chunk_counter = 0 - @on_connect_callback = on_connect_callback - @log = log - @log.trace { - begin - remote_port, remote_addr = *Socket.unpack_sockaddr_in(@_io.getpeername) - rescue - remote_port = nil - remote_addr = nil - end - [ "accepted fluent socket", {address: remote_addr, port: remote_port, instance: self.object_id} ] - } - end - - def on_connect - @on_connect_callback.call(self) - end - - # API to register callback for data arrival - def on_data(&callback) - @on_read_callback = callback - end - - def on_read(data) - @on_read_callback.call(data) - rescue => e - @log.error "unexpected error on reading data from client", address: @remote_addr, error: e - @log.error_backtrace - close - end - - def on_write_complete - closing = @mutex.synchronize { - @writing = false - @closing - } - if closing - close - end - end - - def close - writing = @mutex.synchronize { - @closing = true - @writing - } - unless writing - super - end - end - end - - class HeartbeatRequestHandler < Coolio::IO - def initialize(io, callback) - super(io) - @io = io - @callback = callback - end - - def on_readable - begin - msg, addr = @io.recvfrom(1024) - rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR - return - end - host = addr[3] - port = addr[1] - @callback.call(host, port, msg) - rescue - # TODO log? - end - end - - def on_heartbeat_request(host, port, msg) - #log.trace "heartbeat request from #{host}:#{port}" - begin - @usock.send "\0", 0, host, port - rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR - end - end end end diff --git a/test/plugin/test_in_forward.rb b/test/plugin/test_in_forward.rb index 50b9e8ac0c..8d24b8ce21 100644 --- a/test/plugin/test_in_forward.rb +++ b/test/plugin/test_in_forward.rb @@ -59,13 +59,6 @@ def create_driver(conf=CONFIG) end sub_test_case '#configure' do - setup do - Fluent::Test::StartupShutdown.setup - end - teardown do - Fluent::Test::StartupShutdown.teardown - end - test 'simple' do @d = d = create_driver assert_equal PORT, d.instance.port @@ -88,18 +81,7 @@ def create_driver(conf=CONFIG) end end - def connect - TCPSocket.new('127.0.0.1', PORT) - end - sub_test_case 'message' do - setup do - Fluent::Test::StartupShutdown.setup - end - teardown do - Fluent::Test::StartupShutdown.teardown - end - test 'time' do time = event_time("2011-01-02 13:14:15 UTC") begin @@ -221,6 +203,7 @@ def connect d.run(expect_records: records.length, timeout: 20) do records.each {|tag, _time, record| send_data [tag, _time, record].to_json + "\n" + sleep 1 } end @@ -229,13 +212,6 @@ def connect end sub_test_case 'forward' do - setup do - Fluent::Test::StartupShutdown.setup - end - teardown do - Fluent::Test::StartupShutdown.teardown - end - data(tcp: { config: CONFIG, options: { @@ -345,13 +321,6 @@ def connect end sub_test_case 'packed forward' do - setup do - Fluent::Test::StartupShutdown.setup - end - teardown do - Fluent::Test::StartupShutdown.teardown - end - data(tcp: { config: CONFIG, options: { @@ -464,13 +433,6 @@ def connect end sub_test_case 'compressed packed forward' do - setup do - Fluent::Test::StartupShutdown.setup - end - teardown do - Fluent::Test::StartupShutdown.teardown - end - test 'set_compress_to_option' do @d = d = create_driver @@ -528,13 +490,6 @@ def connect end sub_test_case 'warning' do - setup do - Fluent::Test::StartupShutdown.setup - end - teardown do - Fluent::Test::StartupShutdown.teardown - end - test 'send_large_chunk_warning' do @d = d = create_driver(CONFIG + %[ chunk_size_warn_limit 16M @@ -551,7 +506,7 @@ def connect d.run(shutdown: false) do Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj| - d.instance.send(:on_message, obj, chunk.size, PEERADDR) + d.instance.send(:on_message, obj, chunk.size, '127.0.0.1') end end @@ -565,7 +520,7 @@ def connect logs = d.instance.log.logs assert_equal 1, logs.select{|line| line =~ / \[warn\]: Input chunk size is larger than 'chunk_size_warn_limit':/ && - line =~ / tag="test.tag" source="host: 127.0.0.1, addr: 127.0.0.1, port: \d+" limit=16777216 size=16777501/ + line =~ / tag="test.tag" host="127.0.0.1" limit=16777216 size=16777501/ }.size, "large chunk warning is not logged" d.instance_shutdown @@ -583,7 +538,7 @@ def connect d.run(shutdown: false) do Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj| - d.instance.send(:on_message, obj, chunk.size, PEERADDR) + d.instance.send(:on_message, obj, chunk.size, '127.0.0.1') end end @@ -591,7 +546,7 @@ def connect logs = d.instance.log.logs assert_equal 1, logs.select{ |line| line =~ / \[warn\]: Input chunk size is larger than 'chunk_size_warn_limit':/ && - line =~ / tag="test.tag" source="host: 127.0.0.1, addr: 127.0.0.1, port: \d+" limit=16777216 size=16777501/ + line =~ / tag="test.tag" host="127.0.0.1" limit=16777216 size=16777501/ }.size, "large chunk warning is not logged" d.instance_shutdown @@ -613,7 +568,7 @@ def connect # d.run => send_data d.run(shutdown: false) do Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj| - d.instance.send(:on_message, obj, chunk.size, PEERADDR) + d.instance.send(:on_message, obj, chunk.size, '127.0.0.1') end end @@ -625,7 +580,7 @@ def connect logs = d.instance.log.logs assert_equal 1, logs.select{|line| line =~ / \[warn\]: Input chunk size is larger than 'chunk_size_limit', dropped:/ && - line =~ / tag="test.tag" source="host: 127.0.0.1, addr: 127.0.0.1, port: \d+" limit=33554432 size=33554989/ + line =~ / tag="test.tag" host="127.0.0.1" limit=33554432 size=33554989/ }.size, "large chunk warning is not logged" d.instance_shutdown @@ -638,7 +593,7 @@ def connect # d.run => send_data d.run(shutdown: false) do - d.instance.send(:on_message, data, 1000000000, PEERADDR) + d.instance.send(:on_message, data, 1000000000, '127.0.0.1') end # check emitted data @@ -647,7 +602,7 @@ def connect # check log logs = d.instance.log.logs assert_equal 1, logs.select{|line| - line =~ / \[warn\]: incoming chunk is broken: source="host: 127.0.0.1, addr: 127.0.0.1, port: \d+" msg=#{data.inspect}/ + line =~ / \[warn\]: incoming chunk is broken: host="127.0.0.1" msg=#{data.inspect}/ }.size, "should not accept broken chunk" d.instance_shutdown @@ -655,13 +610,6 @@ def connect end sub_test_case 'respond to required ack' do - setup do - Fluent::Test::StartupShutdown.setup - end - teardown do - Fluent::Test::StartupShutdown.teardown - end - data(tcp: { config: CONFIG, options: { @@ -824,13 +772,6 @@ def connect end sub_test_case 'not respond without required ack' do - setup do - Fluent::Test::StartupShutdown.setup - end - teardown do - Fluent::Test::StartupShutdown.teardown - end - data(tcp: { config: CONFIG, options: { @@ -1068,6 +1009,10 @@ def simulate_auth_sequence(io, shared_key=SHARED_KEY, username=USER_NAME, passwo true end + def connect + TCPSocket.new('127.0.0.1', PORT) + end + # Data ordering is not assured: # Records in different sockets are processed on different thread, so its scheduling make effect # on order of emitted records. @@ -1089,13 +1034,6 @@ def send_data(data, try_to_receive_response: false, response_timeout: 5, auth: f end sub_test_case 'source_hostname_key feature' do - setup do - Fluent::Test::StartupShutdown.setup - end - teardown do - Fluent::Test::StartupShutdown.teardown - end - test 'message protocol with source_hostname_key' do execute_test { |events| events.each { |tag, time, record|