diff --git a/.travis.yml b/.travis.yml index 90f7d13125..418341a535 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,8 @@ language: ruby cache: bundler +# script: bundle exec rake test TESTOPTS=-v + # http://rubies.travis-ci.org/ # See here for osx_image -> OSX versions: https://docs.travis-ci.com/user/languages/objective-c matrix: diff --git a/appveyor.yml b/appveyor.yml index 8818afc999..4f72f5f148 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -12,6 +12,7 @@ install: build: off test_script: - bundle exec rake test +# - bundle exec rake test TESTOPTS=-v branches: only: diff --git a/fluentd.gemspec b/fluentd.gemspec index 95ee52095c..730fca6f5a 100644 --- a/fluentd.gemspec +++ b/fluentd.gemspec @@ -22,7 +22,7 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency("msgpack", [">= 0.7.0", "< 2.0.0"]) gem.add_runtime_dependency("yajl-ruby", ["~> 1.0"]) gem.add_runtime_dependency("cool.io", ["~> 1.4.5"]) - gem.add_runtime_dependency("serverengine", ["~> 2.0"]) + gem.add_runtime_dependency("serverengine", [">= 2.0.4", "< 3.0.0"]) gem.add_runtime_dependency("http_parser.rb", [">= 0.5.1", "< 0.7.0"]) gem.add_runtime_dependency("sigdump", ["~> 0.2.2"]) gem.add_runtime_dependency("tzinfo", ["~> 1.0"]) diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb index d89cef55dc..bb40bc9b9e 100644 --- a/lib/fluent/plugin/in_forward.rb +++ b/lib/fluent/plugin/in_forward.rb @@ -20,10 +20,6 @@ require 'yajl' require 'digest' -require 'fluent/plugin/socket_util' -require 'fcntl' -require 'cool.io' - module Fluent::Plugin class ForwardInput < Input Fluent::Plugin.register_input('forward', self) @@ -31,7 +27,7 @@ class ForwardInput < Input # See the wiki page below for protocol specification # https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1 - helpers :event_loop + helpers :server LISTEN_PORT = 24224 @@ -39,12 +35,15 @@ class ForwardInput < Input config_param :port, :integer, default: LISTEN_PORT desc 'The bind address to listen to.' config_param :bind, :string, default: '0.0.0.0' + config_param :backlog, :integer, default: nil # SO_LINGER 0 to send RST rather than FIN to avoid lots of connections sitting in TIME_WAIT at src desc 'The timeout time used to set linger option.' config_param :linger_timeout, :integer, default: 0 # This option is for Cool.io's loop wait timeout to avoid loop stuck at shutdown. Almost users don't need to change this value. config_param :blocking_timeout, :time, default: 0.5 + desc 'Try to resolve hostname from IP addresses or not.' + config_param :resolve_hostname, :bool, default: nil desc 'Connections will be disconnected right after receiving first message if this value is true.' config_param :deny_keepalive, :bool, default: false @@ -91,6 +90,15 @@ class ForwardInput < Input def configure(conf) super + if @source_hostname_key + # TODO: add test + if @resolve_hostname.nil? + @resolve_hostname = true + elsif !@resolve_hostname # user specifies "false" in config + raise Fluent::ConfigError, "resolve_hostname must be true with source_hostname_key" + end + end + if @security if @security.user_auth && @security.users.empty? raise Fluent::ConfigError, " sections required if user_auth enabled" @@ -131,44 +139,35 @@ def configure(conf) @lsock = @usock = nil end + HEARTBEAT_UDP_PAYLOAD = "\0" + def start super - socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH'] - if Fluent.windows? - socket_manager_path = socket_manager_path.to_i + server_create_connection( + :in_forward_server, @port, + bind: @bind, + shared: false, + resolve_name: @resolve_hostname, + linger_timeout: @linger_timeout, + backlog: @backlog, + &method(:handle_connection) + ) + + server_create(:in_forward_server_udp_heartbeat, @port, shared: false, proto: :udp, bind: @bind, resolve_name: @resolve_hostname, max_bytes: 128) do |data, sock| + log.trace "heartbeat udp data arrived", host: sock.remote_host, port: sock.remote_port, data: data + begin + sock.write HEARTBEAT_UDP_PAYLOAD + rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR + log.trace "error while heartbeat response", host: sock.remote_host, error: e + end end - client = ServerEngine::SocketManager::Client.new(socket_manager_path) - - @lsock = listen(client) - event_loop_attach(@lsock) - - @usock = client.listen_udp(@bind, @port) - @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) - @hbr = HeartbeatRequestHandler.new(@usock, method(:on_heartbeat_request)) - event_loop_attach(@hbr) - end - - def close - @lsock.close if @lsock - @usock.close if @usock - super end - def listen(client) - log.info "listening fluent socket on #{@bind}:#{@port}" - sock = client.listen_tcp(@bind, @port) - s = Coolio::TCPServer.new(sock, nil, Handler, @linger_timeout, log, method(:handle_connection)) - s.listen(@backlog) unless @backlog.nil? - s - end - - private - def handle_connection(conn) send_data = ->(serializer, data){ conn.write serializer.call(data) } - log.trace "connected fluent socket", address: conn.remote_addr, port: conn.remote_port + log.trace "connected fluent socket", addr: conn.remote_addr, port: conn.remote_port state = :established nonce = nil user_auth_salt = nil @@ -182,7 +181,7 @@ def handle_connection(conn) state = :pingpong end - log.trace "accepted fluent socket", address: conn.remote_addr, port: conn.remote_port + log.trace "accepted fluent socket", addr: conn.remote_addr, port: conn.remote_port read_messages(conn) do |msg, chunk_size, serializer| case state @@ -198,15 +197,11 @@ def handle_connection(conn) log.debug "connection established", address: conn.remote_addr, port: conn.remote_port state = :established when :established - options = on_message(msg, chunk_size, conn.remote_addr) + options = on_message(msg, chunk_size, conn.remote_host) if options && r = response(options) - send_data.call(serializer, r) log.trace "sent response to fluent socket", address: conn.remote_addr, response: r - if @deny_keepalive - conn.on_write_complete do - conn.close - end - end + conn.on_write_complete{ conn.close } if @deny_keepalive + send_data.call(serializer, r) else if @deny_keepalive conn.close @@ -222,7 +217,7 @@ def read_messages(conn, &block) feeder = nil serializer = nil bytes = 0 - conn.on_data do |data| + conn.data do |data| # only for first call of callback unless feeder first = data[0] @@ -258,7 +253,7 @@ def response(option) nil end - def on_message(msg, chunk_size, peeraddr) + def on_message(msg, chunk_size, remote_host) if msg.nil? # for future TCP heartbeat_request return @@ -266,7 +261,7 @@ def on_message(msg, chunk_size, peeraddr) # TODO: raise an exception if broken chunk is generated by recoverable situation unless msg.is_a?(Array) - log.warn "incoming chunk is broken:", source: source_message(peeraddr), msg: msg + log.warn "incoming chunk is broken:", host: remote_host, msg: msg return end @@ -274,10 +269,10 @@ def on_message(msg, chunk_size, peeraddr) entries = msg[1] if @chunk_size_limit && (chunk_size > @chunk_size_limit) - log.warn "Input chunk size is larger than 'chunk_size_limit', dropped:", tag: tag, source: source_message(peeraddr), limit: @chunk_size_limit, size: chunk_size + log.warn "Input chunk size is larger than 'chunk_size_limit', dropped:", tag: tag, host: remote_host, limit: @chunk_size_limit, size: chunk_size return elsif @chunk_size_warn_limit && (chunk_size > @chunk_size_warn_limit) - log.warn "Input chunk size is larger than 'chunk_size_warn_limit':", tag: tag, source: source_message(peeraddr), limit: @chunk_size_warn_limit, size: chunk_size + log.warn "Input chunk size is larger than 'chunk_size_warn_limit':", tag: tag, host: remote_host, limit: @chunk_size_warn_limit, size: chunk_size end case entries @@ -287,14 +282,14 @@ def on_message(msg, chunk_size, peeraddr) size = (option && option['size']) || 0 es_class = (option && option['compressed'] == 'gzip') ? Fluent::CompressedMessagePackEventStream : Fluent::MessagePackEventStream es = es_class.new(entries, nil, size.to_i) - es = check_and_skip_invalid_event(tag, es, peeraddr) if @skip_invalid_event - es = add_source_host(es, peeraddr[2]) if @source_hostname_key + es = check_and_skip_invalid_event(tag, es, remote_host) if @skip_invalid_event + es = add_source_host(es, remote_host) if @source_hostname_key router.emit_stream(tag, es) when Array # Forward es = if @skip_invalid_event - check_and_skip_invalid_event(tag, entries, peeraddr) + check_and_skip_invalid_event(tag, entries, remote_host) else es = Fluent::MultiEventStream.new entries.each { |e| @@ -306,7 +301,7 @@ def on_message(msg, chunk_size, peeraddr) } es end - es = add_source_host(es, peeraddr[2]) if @source_hostname_key + es = add_source_host(es, remote_host) if @source_hostname_key router.emit_stream(tag, es) option = msg[2] @@ -315,12 +310,12 @@ def on_message(msg, chunk_size, peeraddr) time = msg[1] record = msg[2] if @skip_invalid_event && invalid_event?(tag, time, record) - log.warn "got invalid event and drop it:", source: source_message(peeraddr), tag: tag, time: time, record: record + log.warn "got invalid event and drop it:", host: remote_host, tag: tag, time: time, record: record return msg[3] # retry never succeeded so return ack and drop incoming event. end return if record.nil? time = Fluent::Engine.now if time.to_i == 0 - record[@source_hostname_key] = peeraddr[2] if @source_hostname_key + record[@source_hostname_key] = remote_host if @source_hostname_key router.emit(tag, time, record) option = msg[3] end @@ -333,11 +328,11 @@ def invalid_event?(tag, time, record) !((time.is_a?(Integer) || time.is_a?(::Fluent::EventTime)) && record.is_a?(Hash) && tag.is_a?(String)) end - def check_and_skip_invalid_event(tag, es, peeraddr) + def check_and_skip_invalid_event(tag, es, remote_host) new_es = Fluent::MultiEventStream.new es.each { |time, record| if invalid_event?(tag, time, record) - log.warn "skip invalid event:", source: source_message(peeraddr), tag: tag, time: time, record: record + log.warn "skip invalid event:", host: remote_host, tag: tag, time: time, record: record next end new_es.add(time, record) @@ -354,11 +349,6 @@ def add_source_host(es, host) new_es end - def source_message(peeraddr) - _, port, host, addr = peeraddr - "host: #{host}, addr: #{addr}, port: #{port}" - end - def select_authenticate_users(node, username) if node.nil? || node[:users].empty? @security.users.select{|u| u.username == username} @@ -424,114 +414,5 @@ def generate_pong(auth_result, reason_or_salt, nonce, shared_key) shared_key_digest_hex = Digest::SHA512.new.update(reason_or_salt).update(@security.self_hostname).update(nonce).update(shared_key).hexdigest ['PONG', true, '', @security.self_hostname, shared_key_digest_hex] end - - class Handler < Coolio::Socket - attr_reader :protocol, :remote_port, :remote_addr, :remote_host - - PEERADDR_FAILED = ["?", "?", "name resolusion failed", "?"] - - def initialize(io, linger_timeout, log, on_connect_callback) - super(io) - - @peeraddr = nil - if io.is_a?(TCPSocket) # for unix domain socket support in the future - @peeraddr = (io.peeraddr rescue PEERADDR_FAILED) - opt = [1, linger_timeout].pack('I!I!') # { int l_onoff; int l_linger; } - io.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) - end - - ### TODO: disabling name rev resolv - proto, port, host, addr = ( io.peeraddr rescue PEERADDR_FAILED ) - if addr == '?' - port, addr = *Socket.unpack_sockaddr_in(io.getpeername) rescue nil - end - @protocol = proto - @remote_port = port - @remote_addr = addr - @remote_host = host - @writing = false - @closing = false - @mutex = Mutex.new - - @chunk_counter = 0 - @on_connect_callback = on_connect_callback - @log = log - @log.trace { - begin - remote_port, remote_addr = *Socket.unpack_sockaddr_in(@_io.getpeername) - rescue - remote_port = nil - remote_addr = nil - end - [ "accepted fluent socket", {address: remote_addr, port: remote_port, instance: self.object_id} ] - } - end - - def on_connect - @on_connect_callback.call(self) - end - - # API to register callback for data arrival - def on_data(&callback) - @on_read_callback = callback - end - - def on_read(data) - @on_read_callback.call(data) - rescue => e - @log.error "unexpected error on reading data from client", address: @remote_addr, error: e - @log.error_backtrace - close - end - - def on_write_complete - closing = @mutex.synchronize { - @writing = false - @closing - } - if closing - close - end - end - - def close - writing = @mutex.synchronize { - @closing = true - @writing - } - unless writing - super - end - end - end - - class HeartbeatRequestHandler < Coolio::IO - def initialize(io, callback) - super(io) - @io = io - @callback = callback - end - - def on_readable - begin - msg, addr = @io.recvfrom(1024) - rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR - return - end - host = addr[3] - port = addr[1] - @callback.call(host, port, msg) - rescue - # TODO log? - end - end - - def on_heartbeat_request(host, port, msg) - #log.trace "heartbeat request from #{host}:#{port}" - begin - @usock.send "\0", 0, host, port - rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR - end - end end end diff --git a/lib/fluent/plugin/in_tcp.rb b/lib/fluent/plugin/in_tcp.rb index addfc604c4..8fdc4cf7d2 100644 --- a/lib/fluent/plugin/in_tcp.rb +++ b/lib/fluent/plugin/in_tcp.rb @@ -14,28 +14,68 @@ # limitations under the License. # -require 'cool.io' +require 'fluent/plugin/input' -require 'fluent/plugin/socket_util' +module Fluent::Plugin + class TcpInput < Input + Fluent::Plugin.register_input('tcp', self) -module Fluent - class TcpInput < SocketUtil::BaseInput - Plugin.register_input('tcp', self) + helpers :server, :parser, :extract, :compat_parameters + + desc 'Tag of output events.' + config_param :tag, :string + desc 'The port to listen to.' + config_param :port, :integer, default: 5170 + desc 'The bind address to listen to.' + config_param :bind, :string, default: '0.0.0.0' + + desc "The field name of the client's hostname." + config_param :source_host_key, :string, default: nil, deprecated: "use source_hostname_key instead." + desc "The field name of the client's hostname." + config_param :source_hostname_key, :string, default: nil + + config_param :blocking_timeout, :time, default: 0.5 - config_set_default :port, 5170 desc 'The payload is read up to this character.' config_param :delimiter, :string, default: "\n" # syslog family add "\n" to each message and this seems only way to split messages in tcp stream - def listen(callback) - log.info "listening tcp socket on #{@bind}:#{@port}" + def configure(conf) + compat_parameters_convert(conf, :parser) + super + @_event_loop_blocking_timeout = @blocking_timeout + @source_hostname_key ||= @source_host_key if @source_host_key + + @parser = parser_create + end + + def start + super + + @buffer = '' + server_create(:in_tcp_server, @port, proto: :tcp, bind: @bind) do |data, conn| + @buffer << data + begin + pos = 0 + while i = @buffer.index(@delimiter, pos) + msg = @buffer[pos...i] + pos = i + @delimiter.length + + @parser.parse(msg) do |time, record| + unless time && record + log.warn "pattern not match", message: msg + next + end - socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH'] - if Fluent.windows? - socket_manager_path = socket_manager_path.to_i + tag = extract_tag_from_record(record) + tag ||= @tag + time ||= extract_time_from_record(record) || Fluent::EventTime.now + record[@source_hostname_key] = conn.remote_host if @source_hostname_key + router.emit(tag, time, record) + end + end + @buffer.slice!(0, pos) if pos > 0 + end end - client = ServerEngine::SocketManager::Client.new(socket_manager_path) - lsock = client.listen_tcp(@bind, @port) - Coolio::TCPServer.new(lsock, nil, SocketUtil::TcpHandler, log, @delimiter, callback) end end end diff --git a/lib/fluent/plugin/in_udp.rb b/lib/fluent/plugin/in_udp.rb index 14f08fc574..21f8cb2441 100644 --- a/lib/fluent/plugin/in_udp.rb +++ b/lib/fluent/plugin/in_udp.rb @@ -14,24 +14,60 @@ # limitations under the License. # -require 'fluent/plugin/socket_util' +require 'fluent/plugin/input' -module Fluent - class UdpInput < SocketUtil::BaseInput - Plugin.register_input('udp', self) +module Fluent::Plugin + class UdpInput < Input + Fluent::Plugin.register_input('udp', self) + + helpers :server, :parser, :extract, :compat_parameters + + desc 'Tag of output events.' + config_param :tag, :string + desc 'The port to listen to.' + config_param :port, :integer, default: 5160 + desc 'The bind address to listen to.' + config_param :bind, :string, default: '0.0.0.0' + + desc "The field name of the client's hostname." + config_param :source_host_key, :string, default: nil, deprecated: "use source_hostname_key instead." + desc "The field name of the client's hostname." + config_param :source_hostname_key, :string, default: nil - config_set_default :port, 5160 config_param :body_size_limit, :size, default: 4096 - def listen(callback) - log.info "listening udp socket on #{@bind}:#{@port}" - socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH'] - if Fluent.windows? - socket_manager_path = socket_manager_path.to_i + config_param :blocking_timeout, :time, default: 0.5 + + def configure(conf) + compat_parameters_convert(conf, :parser) + super + @_event_loop_blocking_timeout = @blocking_timeout + @source_hostname_key ||= @source_host_key if @source_host_key + + @parser = parser_create + end + + def start + super + + log.info "listening udp socket", bind: @bind, port: @port + server_create(:in_udp_server, @port, proto: :udp, bind: @bind, max_bytes: @body_size_limit) do |data, sock| + data.chomp! + begin + @parser.parse(data) do |time, record| + unless time && record + log.warn "pattern not match", data: data + next + end + + tag = extract_tag_from_record(record) + tag ||= @tag + time ||= extract_time_from_record(record) || Fluent::EventTime.now + record[@source_hostname_key] = sock.remote_host if @source_hostname_key + router.emit(tag, time, record) + end + end end - client = ServerEngine::SocketManager::Client.new(socket_manager_path) - @usock = client.listen_udp(@bind, @port) - SocketUtil::UdpHandler.new(@usock, log, @body_size_limit, callback) end end end diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index 11db4dce9a..a8ee808fbc 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -317,7 +317,7 @@ def initialize(io, callback) def on_readable begin msg, addr = @io.recvfrom(1024) - rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR + rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR, Errno::ECONNRESET return end host = addr[3] diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 504f113d95..8108800687 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -1176,7 +1176,7 @@ def flush_thread_run(state) flush_thread_interval = @buffer_config.flush_thread_interval # If the given clock_id is not supported, Errno::EINVAL is raised. - clock_id = Process::CLOCK_MONOTONIC rescue Process::CLOCK_MONOTONIC_RAW + clock_id = Process::CLOCK_MONOTONIC_RAW rescue Process::CLOCK_MONOTONIC state.next_time = Process.clock_gettime(clock_id) + flush_thread_interval while !self.after_started? && !self.stopped? diff --git a/lib/fluent/plugin_helper.rb b/lib/fluent/plugin_helper.rb index 8977f64913..d37519a9cb 100644 --- a/lib/fluent/plugin_helper.rb +++ b/lib/fluent/plugin_helper.rb @@ -24,6 +24,8 @@ require 'fluent/plugin_helper/formatter' require 'fluent/plugin_helper/inject' require 'fluent/plugin_helper/extract' +# require 'fluent/plugin_helper/socket' +require 'fluent/plugin_helper/server' require 'fluent/plugin_helper/retry_state' require 'fluent/plugin_helper/compat_parameters' diff --git a/lib/fluent/plugin_helper/event_loop.rb b/lib/fluent/plugin_helper/event_loop.rb index eb45f5c5f9..5b6618c8a5 100644 --- a/lib/fluent/plugin_helper/event_loop.rb +++ b/lib/fluent/plugin_helper/event_loop.rb @@ -30,12 +30,16 @@ module EventLoop # terminate: initialize internal state EVENT_LOOP_RUN_DEFAULT_TIMEOUT = 0.5 + EVENT_LOOP_SHUTDOWN_TIMEOUT = 5 + EVENT_LOOP_CLOCK_ID = Process::CLOCK_MONOTONIC_RAW rescue Process::CLOCK_MONOTONIC attr_reader :_event_loop # for tests def event_loop_attach(watcher) @_event_loop_mutex.synchronize do @_event_loop.attach(watcher) + @_event_loop_attached_watchers << watcher + watcher end end @@ -58,6 +62,7 @@ def initialize @_event_loop_mutex = Mutex.new # plugin MAY configure loop run timeout in #configure @_event_loop_run_timeout = EVENT_LOOP_RUN_DEFAULT_TIMEOUT + @_event_loop_attached_watchers = [] end def start @@ -65,19 +70,32 @@ def start # event loop does not run here, so mutex lock is not required thread_create :event_loop do - default_watcher = DefaultWatcher.new - @_event_loop.attach(default_watcher) - @_event_loop_running = true - @_event_loop.run(@_event_loop_run_timeout) # this method blocks - @_event_loop_running = false + begin + default_watcher = DefaultWatcher.new + event_loop_attach(default_watcher) + @_event_loop_running = true + @_event_loop.run(@_event_loop_run_timeout) # this method blocks + ensure + @_event_loop_running = false + end end end def shutdown @_event_loop_mutex.synchronize do - @_event_loop.watchers.each {|w| w.detach if w.attached? } + @_event_loop_attached_watchers.reverse.each do |w| + if w.attached? + w.detach + end + end end + timeout_at = Process.clock_gettime(EVENT_LOOP_CLOCK_ID) + EVENT_LOOP_SHUTDOWN_TIMEOUT while @_event_loop_running + if Process.clock_gettime(EVENT_LOOP_CLOCK_ID) >= timeout_at + log.warn "event loop does NOT exit until hard timeout." + raise "event loop does NOT exit until hard timeout." if @under_plugin_development + break + end sleep 0.1 end diff --git a/lib/fluent/plugin_helper/server.rb b/lib/fluent/plugin_helper/server.rb new file mode 100644 index 0000000000..b6d28bf2ef --- /dev/null +++ b/lib/fluent/plugin_helper/server.rb @@ -0,0 +1,487 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin_helper/event_loop' + +require 'serverengine' +require 'cool.io' +require 'socket' +require 'ipaddr' +require 'fcntl' + +require_relative 'socket_option' + +module Fluent + module PluginHelper + module Server + include Fluent::PluginHelper::EventLoop + include Fluent::PluginHelper::SocketOption + + # This plugin helper doesn't support these things for now: + # * SSL/TLS (TBD) + # * TCP/TLS keepalive + + # stop : [-] + # shutdown : detach server event handler from event loop (event_loop) + # close : close listening sockets + # terminate: remote all server instances + + attr_reader :_servers # for tests + + def server_wait_until_start + # event_loop_wait_until_start works well for this + end + + def server_wait_until_stop + sleep 0.1 while @_servers.any?{|si| si.server.attached? } + @_servers.each{|si| si.server.close rescue nil } + end + + PROTOCOLS = [:tcp, :udp, :tls, :unix] + CONNECTION_PROTOCOLS = [:tcp, :tls, :unix] + + # server_create_connection(:title, @port) do |conn| + # # on connection + # source_addr = conn.remote_host + # source_port = conn.remote_port + # conn.data do |data| + # # on data + # conn.write resp # ... + # conn.close + # end + # end + def server_create_connection(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, backlog: nil, **socket_options, &block) + raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol) + raise ArgumentError, "BUG: port must be an integer" unless port && port.is_a?(Integer) + raise ArgumentError, "BUG: invalid protocol name" unless PROTOCOLS.include?(proto) + raise ArgumentError, "BUG: cannot create connection for UDP" unless CONNECTION_PROTOCOLS.include?(proto) + + raise ArgumentError, "BUG: block not specified which handles connection" unless block_given? + raise ArgumentError, "BUG: block must have just one argument" unless block.arity == 1 + + if proto == :tcp || proto == :tls # default linger_timeout only for server + socket_options[:linger_timeout] ||= 0 + end + + socket_option_validate!(proto, **socket_options) + socket_option_setter = ->(sock){ socket_option_set(sock, **socket_options) } + + case proto + when :tcp + server = server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, &block) + when :tls + raise ArgumentError, "BUG: certopts (certificate options) not specified for TLS" unless certopts + # server_certopts_validate!(certopts) + # sock = server_create_tls_socket(shared, bind, port) + # server = nil # ... + raise "not implemented yet" + when :unix + raise "not implemented yet" + else + raise "unknown protocol #{proto}" + end + + server_attach(title, proto, port, bind, shared, server) + end + + # server_create(:title, @port) do |data| + # # ... + # end + # server_create(:title, @port) do |data, conn| + # # ... + # end + # server_create(:title, @port, proto: :udp, max_bytes: 2048) do |data, sock| + # sock.remote_host + # sock.remote_port + # # ... + # end + def server_create(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, backlog: nil, max_bytes: nil, flags: 0, **socket_options, &callback) + raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol) + raise ArgumentError, "BUG: port must be an integer" unless port && port.is_a?(Integer) + raise ArgumentError, "BUG: invalid protocol name" unless PROTOCOLS.include?(proto) + + raise ArgumentError, "BUG: block not specified which handles received data" unless block_given? + raise ArgumentError, "BUG: block must have 1 or 2 arguments" unless callback.arity == 1 || callback.arity == 2 + + if proto == :tcp || proto == :tls # default linger_timeout only for server + socket_options[:linger_timeout] ||= 0 + end + + socket_option_validate!(proto, **socket_options) + socket_option_setter = ->(sock){ socket_option_set(sock, **socket_options) } + + if proto != :tcp && proto != :tls && proto != :unix # options to listen/accept connections + raise ArgumentError, "BUG: backlog is available for tcp/tls" if backlog + end + if proto != :udp # UDP options + raise ArgumentError, "BUG: max_bytes is available only for udp" if max_bytes + raise ArgumentError, "BUG: flags is available only for udp" if flags != 0 + end + + case proto + when :tcp + server = server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter) do |conn| + conn.data(&callback) + end + when :tls + raise "not implemented yet" + when :udp + raise ArgumentError, "BUG: max_bytes must be specified for UDP" unless max_bytes + sock = server_create_udp_socket(shared, bind, port) + socket_option_setter.call(sock) + server = EventHandler::UDPServer.new(sock, max_bytes, flags, @log, @under_plugin_development, &callback) + when :unix + raise "not implemented yet" + else + raise "BUG: unknown protocol #{proto}" + end + + server_attach(title, proto, port, bind, shared, server) + end + + def server_create_tcp(title, port, **kwargs, &callback) + server_create(title, port, proto: :tcp, **kwargs, &callback) + end + + def server_create_udp(title, port, **kwargs, &callback) + server_create(title, port, proto: :udp, **kwargs, &callback) + end + + def server_create_tls(title, port, **kwargs, &callback) + server_create(title, port, proto: :tls, **kwargs, &callback) + end + + def server_create_unix(title, port, **kwargs, &callback) + server_create(title, port, proto: :unix, **kwargs, &callback) + end + + ServerInfo = Struct.new(:title, :proto, :port, :bind, :shared, :server) + + def server_attach(title, proto, port, bind, shared, server) + @_servers << ServerInfo.new(title, proto, port, bind, shared, server) + event_loop_attach(server) + end + + def server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, &block) + sock = server_create_tcp_socket(shared, bind, port) + socket_option_setter.call(sock) + close_callback = ->(conn){ @_server_mutex.synchronize{ @_server_connections.delete(conn) } } + server = Coolio::TCPServer.new(sock, nil, EventHandler::TCPServer, socket_option_setter, close_callback, @log, @under_plugin_development, block) do |conn| + @_server_mutex.synchronize do + @_server_connections << conn + end + end + server.listen(backlog) if backlog + server + end + + def initialize + super + @_servers = [] + @_server_connections = [] + @_server_mutex = Mutex.new + end + + def shutdown + @_server_connections.each do |conn| + conn.close rescue nil + end + @_server_mutex.synchronize do + @_servers.each do |si| + si.server.detach if si.server.attached? + end + end + + super + end + + def close + @_server_connections.each do |conn| + conn.close rescue nil + end + @_server_mutex.synchronize do + @_servers.each do |si| + si.server.close rescue nil + end + end + super + end + + def terminate + @_servers = [] + super + end + + def server_certopts_validate!(certopts) + raise "not implemented yet" + end + + def server_socket_manager_client + socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH'] + if Fluent.windows? + socket_manager_path = socket_manager_path.to_i + end + ServerEngine::SocketManager::Client.new(socket_manager_path) + end + + def server_create_tcp_socket(shared, bind, port) + sock = if shared + server_socket_manager_client.listen_tcp(bind, port) + else + TCPServer.new(bind, port) # this method call can create sockets for AF_INET6 + end + # close-on-exec is set by default in Ruby 2.0 or later (, and it's unavailable on Windows) + sock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # nonblock + sock + end + + def server_create_udp_socket(shared, bind, port) + sock = if shared + server_socket_manager_client.listen_udp(bind, port) + else + family = IPAddr.new(IPSocket.getaddress(bind)).ipv4? ? ::Socket::AF_INET : ::Socket::AF_INET6 + usock = UDPSocket.new(family) + usock.bind(bind, port) + usock + end + # close-on-exec is set by default in Ruby 2.0 or later (, and it's unavailable on Windows) + sock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # nonblock + sock + end + + def server_create_tls_socket(shared, bind, port) + raise "not implemented yet" + end + + class CallbackSocket + def initialize(server_type, sock, enabled_events = []) + @server_type = server_type + @sock = sock + @enabled_events = enabled_events + end + + def remote_addr + @sock.peeraddr[3] + end + + def remote_host + @sock.peeraddr[2] + end + + def remote_port + @sock.peeraddr[1] + end + + def send(data, flags = 0) + @sock.send(data, flags) + end + + def write(data) + raise "not implemented here" + end + + def close + @sock.close + # close cool.io socket in another thread, not to make deadlock + # for flushing @_write_buffer when conn.close is called in callback + # ::Thread.new{ + # @sock.close + # } + end + + def data(&callback) + on(:data, &callback) + end + + def on(event, &callback) + raise "BUG: this event is disabled for #{@server_type}: #{event}" unless @enabled_events.include?(event) + case event + when :data + @sock.data(&callback) + when :write_complete + cb = ->(){ callback.call(self) } + @sock.on_write_complete(&cb) + when :close + cb = ->(){ callback.call(self) } + @sock.on_close(&cb) + else + raise "BUG: unknown event: #{event}" + end + end + end + + class TCPCallbackSocket < CallbackSocket + def initialize(sock) + super("tcp", sock, [:data, :write_complete, :close]) + end + + def write(data) + @sock.write(data) + end + end + + class UDPCallbackSocket < CallbackSocket + def initialize(sock, peeraddr) + super("udp", sock, []) + @peeraddr = peeraddr + end + + def remote_addr + @peeraddr[3] + end + + def remote_host + @peeraddr[2] + end + + def remote_port + @peeraddr[1] + end + + def write(data) + @sock.send(data, 0, @peeraddr[3], @peeraddr[1]) + end + end + + module EventHandler + class UDPServer < Coolio::IO + def initialize(sock, max_bytes, flags, log, under_plugin_development, &callback) + raise ArgumentError, "socket must be a UDPSocket: sock = #{sock}" unless sock.is_a?(UDPSocket) + + super(sock) + + @sock = sock + @max_bytes = max_bytes + @flags = flags + @log = log + @under_plugin_development = under_plugin_development + @callback = callback + + on_readable_impl = case @callback.arity + when 1 then :on_readable_without_sock + when 2 then :on_readable_with_sock + else + raise "BUG: callback block must have 1 or 2 arguments" + end + self.define_singleton_method(:on_readable, method(on_readable_impl)) + end + + def on_readable_without_sock + begin + data = @sock.recv(@max_bytes, @flags) + rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR + return + end + @callback.call(data) + rescue => e + @log.error "unexpected error in processing UDP data", error: e + @log.error_backtrace + raise if @under_plugin_development + end + + def on_readable_with_sock + begin + data, addr = @sock.recvfrom(@max_bytes) + rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR + return + end + @callback.call(data, UDPCallbackSocket.new(@sock, addr)) + rescue => e + @log.error "unexpected error in processing UDP data", error: e + @log.error_backtrace + raise if @under_plugin_development + end + end + + class TCPServer < Coolio::TCPSocket + def initialize(sock, socket_option_setter, close_callback, log, under_plugin_development, connect_callback) + raise ArgumentError, "socket must be a TCPSocket: sock=#{sock}" unless sock.is_a?(TCPSocket) + + socket_option_setter.call(sock) + + @_handler_socket = sock + super(sock) + + @log = log + @under_plugin_development = under_plugin_development + + @connect_callback = connect_callback + @data_callback = nil + @close_callback = close_callback + + @callback_connection = nil + @closing = false + + @mutex = Mutex.new # to serialize #write and #close + end + + def data(&callback) + raise "data callback can be registered just once, but registered twice" if self.singleton_methods.include?(:on_read) + @data_callback = callback + on_read_impl = case callback.arity + when 1 then :on_read_without_connection + when 2 then :on_read_with_connection + else + raise "BUG: callback block must have 1 or 2 arguments" + end + self.define_singleton_method(:on_read, method(on_read_impl)) + end + + def write(data) + @mutex.synchronize do + super + end + end + + def on_connect + @callback_connection = TCPCallbackSocket.new(self) + @connect_callback.call(@callback_connection) + unless @data_callback + raise "connection callback must call #data to set data callback" + end + end + + def on_read_without_connection(data) + @data_callback.call(data) + rescue => e + @log.error "unexpected error on reading data", host: remote_host, port: remote_port, error: e + @log.error_backtrace + close(true) rescue nil + raise if @under_plugin_development + end + + def on_read_with_connection(data) + @data_callback.call(data, @callback_connection) + rescue => e + @log.error "unexpected error on reading data", host: remote_host, port: remote_port, error: e + @log.error_backtrace + close(true) rescue nil + raise if @under_plugin_development + end + + def close + @mutex.synchronize do + return if @closing + @closing = true + @close_callback.call(self) + super + end + end + end + end + end + end +end diff --git a/lib/fluent/plugin_helper/socket_option.rb b/lib/fluent/plugin_helper/socket_option.rb new file mode 100644 index 0000000000..7ef9a933f0 --- /dev/null +++ b/lib/fluent/plugin_helper/socket_option.rb @@ -0,0 +1,80 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'socket' + +# this module is only for Socket/Server plugin helpers +module Fluent + module PluginHelper + module SocketOption + FORMAT_STRUCT_LINGER = 'I!I!' # { int l_onoff; int l_linger; } + FORMAT_STRUCT_TIMEVAL = 'L!L!' # { time_t tv_sec; suseconds_t tv_usec; } + + def socket_option_validate!(protocol, resolve_name: nil, linger_timeout: nil, recv_timeout: nil, send_timeout: nil, certopts: nil) + unless resolve_name.nil? + if protocol != :tcp && protocol != :udp && protocol != :tls + raise ArgumentError, "BUG: resolve_name in available for tcp/udp/tls" + end + end + if linger_timeout + if protocol != :tcp && protocol != :tls + raise ArgumentError, "BUG: linger_timeout is available for tcp/tls" + end + end + if certopts + if protocol != :tls + raise ArgumentError, "BUG: certopts is available only for tls" + end + else + if protocol == :tls + raise ArgumentError, "BUG: certopts (certificate options) not specified for TLS" + socket_option_certopts_validate!(certopts) + end + end + end + + def socket_option_certopts_validate!(certopts) + raise "not implemented yet" + end + + def socket_option_set(sock, resolve_name: nil, linger_timeout: nil, recv_timeout: nil, send_timeout: nil, certopts: nil) + unless resolve_name.nil? + sock.do_not_reverse_lookup = !resolve_name + end + if linger_timeout + optval = [1, linger_timeout.to_i].pack(FORMAT_STRUCT_LINGER) + socket_option_set_one(sock, :SO_LINGER, optval) + end + if recv_timeout + optval = [recv_timeout.to_i, 0].pack(FORMAT_STRUCT_TIMEVAL) + socket_option_set_one(sock, :SO_RCVTIMEO, optval) + end + if send_timeout + optval = [send_timeout.to_i, 0].pack(FORMAT_STRUCT_TIMEVAL) + socket_option_set_one(sock, :SO_SNDTIMEO, optval) + end + # TODO: certopts for TLS + sock + end + + def socket_option_set_one(sock, option, value) + sock.setsockopt(Socket::SOL_SOCKET, option, value) + rescue => e + log.warn "failed to set socket option", sock: sock.class, option: option, value: value, error: e + end + end + end +end diff --git a/lib/fluent/plugin_helper/timer.rb b/lib/fluent/plugin_helper/timer.rb index 8a1c62a638..02fac1ced8 100644 --- a/lib/fluent/plugin_helper/timer.rb +++ b/lib/fluent/plugin_helper/timer.rb @@ -36,6 +36,7 @@ def timer_execute(title, interval, repeat: true, &block) timer = TimerWatcher.new(title, interval, repeat, log, checker, &block) @_timers << title event_loop_attach(timer) + timer end def timer_running? diff --git a/lib/fluent/test/driver/base.rb b/lib/fluent/test/driver/base.rb index 6b7d5af237..5f52af883b 100644 --- a/lib/fluent/test/driver/base.rb +++ b/lib/fluent/test/driver/base.rb @@ -18,6 +18,8 @@ require 'fluent/config/element' require 'fluent/log' +require 'serverengine/socket_manager' +require 'fileutils' require 'timeout' module Fluent @@ -46,6 +48,8 @@ def initialize(klass, opts: {}, &block) end @instance.under_plugin_development = true + @socket_manager_server = nil + @logs = [] @test_clock_id = Process::CLOCK_MONOTONIC_RAW rescue Process::CLOCK_MONOTONIC @@ -101,6 +105,15 @@ def run(timeout: nil, start: true, shutdown: true, &block) end def instance_start + if @instance.respond_to?(:server_wait_until_start) + @socket_manager_path = ServerEngine::SocketManager::Server.generate_path + if @socket_manager_path.is_a?(String) && File.exist?(@socket_manager_path) + FileUtils.rm_f @socket_manager_path + end + @socket_manager_server = ServerEngine::SocketManager::Server.open(@socket_manager_path) + ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = @socket_manager_path.to_s + end + unless @instance.started? @instance.start end @@ -145,7 +158,18 @@ def instance_shutdown @instance.thread_wait_until_stop end + if @instance.respond_to?(:server_wait_until_stop) + @instance.server_wait_until_stop + end + @instance.terminate unless @instance.terminated? + + if @socket_manager_server + @socket_manager_server.close + if @socket_manager_server.is_a?(String) && File.exist?(@socket_manager_path) + FileUtils.rm_f @socket_manager_path + end + end end def run_actual(timeout: DEFAULT_TIMEOUT, &block) diff --git a/test/helper.rb b/test/helper.rb index 16ef30f348..592bc423a6 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -69,7 +69,18 @@ class Test::Unit::AssertionFailedError < StandardError include Fluent::Test::Helpers -def unused_port(num = 1) +def unused_port(num = 1, protocol: :tcp, bind: "0.0.0.0") + case protocol + when :tcp + unused_port_tcp(num) + when :udp + unused_port_udp(num, bind: bind) + else + raise ArgumentError, "unknown protocol: #{protocol}" + end +end + +def unused_port_tcp(num = 1) ports = [] sockets = [] num.times do @@ -85,6 +96,30 @@ def unused_port(num = 1) end end +PORT_RANGE_AVAILABLE = (1024...65535) + +def unused_port_udp(num = 1, bind: "0.0.0.0") + family = IPAddr.new(IPSocket.getaddress(bind)).ipv4? ? ::Socket::AF_INET : ::Socket::AF_INET6 + ports = [] + sockets = [] + while ports.size < num + port = rand(PORT_RANGE_AVAILABLE) + u = UDPSocket.new(family) + if (u.bind(bind, port) rescue nil) + ports << port + sockets << u + else + u.close + end + end + sockets.each{|s| s.close } + if num == 1 + return ports.first + else + return *ports + end +end + def waiting(seconds, logs: nil, plugin: nil) begin Timeout.timeout(seconds) do diff --git a/test/plugin/test_in_forward.rb b/test/plugin/test_in_forward.rb index 50b9e8ac0c..8d24b8ce21 100644 --- a/test/plugin/test_in_forward.rb +++ b/test/plugin/test_in_forward.rb @@ -59,13 +59,6 @@ def create_driver(conf=CONFIG) end sub_test_case '#configure' do - setup do - Fluent::Test::StartupShutdown.setup - end - teardown do - Fluent::Test::StartupShutdown.teardown - end - test 'simple' do @d = d = create_driver assert_equal PORT, d.instance.port @@ -88,18 +81,7 @@ def create_driver(conf=CONFIG) end end - def connect - TCPSocket.new('127.0.0.1', PORT) - end - sub_test_case 'message' do - setup do - Fluent::Test::StartupShutdown.setup - end - teardown do - Fluent::Test::StartupShutdown.teardown - end - test 'time' do time = event_time("2011-01-02 13:14:15 UTC") begin @@ -221,6 +203,7 @@ def connect d.run(expect_records: records.length, timeout: 20) do records.each {|tag, _time, record| send_data [tag, _time, record].to_json + "\n" + sleep 1 } end @@ -229,13 +212,6 @@ def connect end sub_test_case 'forward' do - setup do - Fluent::Test::StartupShutdown.setup - end - teardown do - Fluent::Test::StartupShutdown.teardown - end - data(tcp: { config: CONFIG, options: { @@ -345,13 +321,6 @@ def connect end sub_test_case 'packed forward' do - setup do - Fluent::Test::StartupShutdown.setup - end - teardown do - Fluent::Test::StartupShutdown.teardown - end - data(tcp: { config: CONFIG, options: { @@ -464,13 +433,6 @@ def connect end sub_test_case 'compressed packed forward' do - setup do - Fluent::Test::StartupShutdown.setup - end - teardown do - Fluent::Test::StartupShutdown.teardown - end - test 'set_compress_to_option' do @d = d = create_driver @@ -528,13 +490,6 @@ def connect end sub_test_case 'warning' do - setup do - Fluent::Test::StartupShutdown.setup - end - teardown do - Fluent::Test::StartupShutdown.teardown - end - test 'send_large_chunk_warning' do @d = d = create_driver(CONFIG + %[ chunk_size_warn_limit 16M @@ -551,7 +506,7 @@ def connect d.run(shutdown: false) do Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj| - d.instance.send(:on_message, obj, chunk.size, PEERADDR) + d.instance.send(:on_message, obj, chunk.size, '127.0.0.1') end end @@ -565,7 +520,7 @@ def connect logs = d.instance.log.logs assert_equal 1, logs.select{|line| line =~ / \[warn\]: Input chunk size is larger than 'chunk_size_warn_limit':/ && - line =~ / tag="test.tag" source="host: 127.0.0.1, addr: 127.0.0.1, port: \d+" limit=16777216 size=16777501/ + line =~ / tag="test.tag" host="127.0.0.1" limit=16777216 size=16777501/ }.size, "large chunk warning is not logged" d.instance_shutdown @@ -583,7 +538,7 @@ def connect d.run(shutdown: false) do Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj| - d.instance.send(:on_message, obj, chunk.size, PEERADDR) + d.instance.send(:on_message, obj, chunk.size, '127.0.0.1') end end @@ -591,7 +546,7 @@ def connect logs = d.instance.log.logs assert_equal 1, logs.select{ |line| line =~ / \[warn\]: Input chunk size is larger than 'chunk_size_warn_limit':/ && - line =~ / tag="test.tag" source="host: 127.0.0.1, addr: 127.0.0.1, port: \d+" limit=16777216 size=16777501/ + line =~ / tag="test.tag" host="127.0.0.1" limit=16777216 size=16777501/ }.size, "large chunk warning is not logged" d.instance_shutdown @@ -613,7 +568,7 @@ def connect # d.run => send_data d.run(shutdown: false) do Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj| - d.instance.send(:on_message, obj, chunk.size, PEERADDR) + d.instance.send(:on_message, obj, chunk.size, '127.0.0.1') end end @@ -625,7 +580,7 @@ def connect logs = d.instance.log.logs assert_equal 1, logs.select{|line| line =~ / \[warn\]: Input chunk size is larger than 'chunk_size_limit', dropped:/ && - line =~ / tag="test.tag" source="host: 127.0.0.1, addr: 127.0.0.1, port: \d+" limit=33554432 size=33554989/ + line =~ / tag="test.tag" host="127.0.0.1" limit=33554432 size=33554989/ }.size, "large chunk warning is not logged" d.instance_shutdown @@ -638,7 +593,7 @@ def connect # d.run => send_data d.run(shutdown: false) do - d.instance.send(:on_message, data, 1000000000, PEERADDR) + d.instance.send(:on_message, data, 1000000000, '127.0.0.1') end # check emitted data @@ -647,7 +602,7 @@ def connect # check log logs = d.instance.log.logs assert_equal 1, logs.select{|line| - line =~ / \[warn\]: incoming chunk is broken: source="host: 127.0.0.1, addr: 127.0.0.1, port: \d+" msg=#{data.inspect}/ + line =~ / \[warn\]: incoming chunk is broken: host="127.0.0.1" msg=#{data.inspect}/ }.size, "should not accept broken chunk" d.instance_shutdown @@ -655,13 +610,6 @@ def connect end sub_test_case 'respond to required ack' do - setup do - Fluent::Test::StartupShutdown.setup - end - teardown do - Fluent::Test::StartupShutdown.teardown - end - data(tcp: { config: CONFIG, options: { @@ -824,13 +772,6 @@ def connect end sub_test_case 'not respond without required ack' do - setup do - Fluent::Test::StartupShutdown.setup - end - teardown do - Fluent::Test::StartupShutdown.teardown - end - data(tcp: { config: CONFIG, options: { @@ -1068,6 +1009,10 @@ def simulate_auth_sequence(io, shared_key=SHARED_KEY, username=USER_NAME, passwo true end + def connect + TCPSocket.new('127.0.0.1', PORT) + end + # Data ordering is not assured: # Records in different sockets are processed on different thread, so its scheduling make effect # on order of emitted records. @@ -1089,13 +1034,6 @@ def send_data(data, try_to_receive_response: false, response_timeout: 5, auth: f end sub_test_case 'source_hostname_key feature' do - setup do - Fluent::Test::StartupShutdown.setup - end - teardown do - Fluent::Test::StartupShutdown.teardown - end - test 'message protocol with source_hostname_key' do execute_test { |events| events.each { |tag, time, record| diff --git a/test/plugin/test_in_tcp.rb b/test/plugin/test_in_tcp.rb index c6416a1672..c53b4478cc 100755 --- a/test/plugin/test_in_tcp.rb +++ b/test/plugin/test_in_tcp.rb @@ -1,20 +1,8 @@ require_relative '../helper' -require 'fluent/test' +require 'fluent/test/driver/input' require 'fluent/plugin/in_tcp' class TcpInputTest < Test::Unit::TestCase - class << self - def startup - socket_manager_path = ServerEngine::SocketManager::Server.generate_path - @server = ServerEngine::SocketManager::Server.open(socket_manager_path) - ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s - end - - def shutdown - @server.close - end - end - def setup Fluent::Test.setup end @@ -34,69 +22,97 @@ def setup ] def create_driver(conf) - Fluent::Test::InputTestDriver.new(Fluent::TcpInput).configure(conf) + Fluent::Test::Driver::Input.new(Fluent::Plugin::TcpInput).configure(conf) end - def test_configure - configs = {'127.0.0.1' => CONFIG} - configs.merge!('::1' => IPv6_CONFIG) if ipv6_enabled? + def create_tcp_socket(host, port, &block) + if block_given? + TCPSocket.open(host, port, &block) + else + TCPSocket.open(host, port) + end + end + + + data( + 'ipv4' => [CONFIG, '127.0.0.1', :ipv4], + 'ipv6' => [IPv6_CONFIG, '::1', :ipv6], + ) + test 'configure' do |data| + conf, bind, protocol = data + omit "IPv6 is not supported on this environment" if protocol == :ipv6 && !ipv6_enabled? - configs.each_pair { |k, v| - d = create_driver(v) - assert_equal PORT, d.instance.port - assert_equal k, d.instance.bind - assert_equal "\n", d.instance.delimiter - } + d = create_driver(conf) + assert_equal PORT, d.instance.port + assert_equal bind, d.instance.bind + assert_equal "\n", d.instance.delimiter end - { - 'none' => [ - {'msg' => "tcptest1\n", 'expected' => 'tcptest1'}, - {'msg' => "tcptest2\n", 'expected' => 'tcptest2'}, - ], - 'json' => [ - {'msg' => {'k' => 123, 'message' => 'tcptest1'}.to_json + "\n", 'expected' => 'tcptest1'}, - {'msg' => {'k' => 'tcptest2', 'message' => 456}.to_json + "\n", 'expected' => 456}, - ] - }.each { |format, test_cases| - define_method("test_msg_size_#{format}") do - d = create_driver(BASE_CONFIG + "format #{format}") - tests = test_cases + test_case_data = { + 'none' => { + 'format' => 'none', + 'payloads' => [ "tcptest1\n", "tcptest2\n" ], + 'expecteds' => [ + {'message' => 'tcptest1'}, + {'message' => 'tcptest2'}, + ], + }, + 'json' => { + 'format' => 'json', + 'payloads' => [ + {'k' => 123, 'message' => 'tcptest1'}.to_json + "\n", + {'k' => 'tcptest2', 'message' => 456}.to_json + "\n", + ], + 'expecteds' => [ + {'k' => 123, 'message' => 'tcptest1'}, + {'k' => 'tcptest2', 'message' => 456} + ], + }, + } + + data(test_case_data) + test 'test_msg_size' do |data| + format = data['format'] + payloads = data['payloads'] + expecteds = data['expecteds'] - d.run do - tests.each {|test| - TCPSocket.open('127.0.0.1', PORT) do |s| - s.send(test['msg'], 0) - end - } - sleep 1 + d = create_driver(BASE_CONFIG + "format #{format}") + d.run(expect_records: 2) do + payloads.each do |payload| + create_tcp_socket('127.0.0.1', PORT) do |sock| + sock.send(payload, 0) + end end + end - compare_test_result(d.emits, tests) + assert_equal 2, d.events.size + expecteds.each_with_index do |expected_record, i| + assert_equal "tcp", d.events[i][0] + assert d.events[i][1].is_a?(Fluent::EventTime) + assert_equal expected_record, d.events[i][2] end + end - define_method("test_msg_size_with_same_connection_#{format}") do - d = create_driver(BASE_CONFIG + "format #{format}") - tests = test_cases + data(test_case_data) + test 'test data in a connection' do |data| + format = data['format'] + payloads = data['payloads'] + expecteds = data['expecteds'] - d.run do - TCPSocket.open('127.0.0.1', PORT) do |s| - tests.each {|test| - s.send(test['msg'], 0) - } + d = create_driver(BASE_CONFIG + "format #{format}") + d.run(expect_records: 2) do + create_tcp_socket('127.0.0.1', PORT) do |sock| + payloads.each do |payload| + sock.send(payload, 0) end - sleep 1 end - - compare_test_result(d.emits, tests) end - } - def compare_test_result(emits, tests) - assert_equal(2, emits.size) - emits.each_index {|i| - assert_equal(tests[i]['expected'], emits[i][2]['message']) - assert(emits[i][1].is_a?(Fluent::EventTime)) - } + assert_equal 2, d.events.size + expecteds.each_with_index do |expected_record, i| + assert_equal "tcp", d.events[i][0] + assert d.events[i][1].is_a?(Fluent::EventTime) + assert_equal expected_record, d.events[i][2] + end end end diff --git a/test/plugin/test_in_udp.rb b/test/plugin/test_in_udp.rb index 01c47c1b8a..d74588b77e 100755 --- a/test/plugin/test_in_udp.rb +++ b/test/plugin/test_in_udp.rb @@ -1,20 +1,8 @@ require_relative '../helper' -require 'fluent/test' +require 'fluent/test/driver/input' require 'fluent/plugin/in_udp' class UdpInputTest < Test::Unit::TestCase - class << self - def startup - socket_manager_path = ServerEngine::SocketManager::Server.generate_path - @server = ServerEngine::SocketManager::Server.open(socket_manager_path) - ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s - end - - def shutdown - @server.close - end - end - def setup Fluent::Test.setup end @@ -34,88 +22,121 @@ def setup ! def create_driver(conf) - Fluent::Test::InputTestDriver.new(Fluent::UdpInput).configure(conf) + Fluent::Test::Driver::Input.new(Fluent::Plugin::UdpInput).configure(conf) end - def test_configure - configs = {'127.0.0.1' => CONFIG} - configs.merge!('::1' => IPv6_CONFIG) if ipv6_enabled? + def create_udp_socket(host, port) + u = if IPAddr.new(IPSocket.getaddress(host)).ipv4? + UDPSocket.new(Socket::AF_INET) + else + UDPSocket.new(Socket::AF_INET6) + end + u.connect(host, port) + if block_given? + begin + yield u + ensure + u.close rescue nil + end + else + u + end + end - configs.each_pair { |k, v| - d = create_driver(v) - assert_equal PORT, d.instance.port - assert_equal k, d.instance.bind - assert_equal 4096, d.instance.body_size_limit - } + data( + 'ipv4' => [CONFIG, '127.0.0.1', :ipv4], + 'ipv6' => [IPv6_CONFIG, '::1', :ipv6], + ) + test 'configure' do |data| + conf, bind, protocol = data + omit "IPv6 is not supported on this environment" if protocol == :ipv6 && !ipv6_enabled? + + d = create_driver(conf) + assert_equal PORT, d.instance.port + assert_equal bind, d.instance.bind + assert_equal 4096, d.instance.body_size_limit end - def test_time_format - configs = {'127.0.0.1' => CONFIG} - configs.merge!('::1' => IPv6_CONFIG) if ipv6_enabled? + data( + 'ipv4' => [CONFIG, '127.0.0.1', :ipv4], + 'ipv6' => [IPv6_CONFIG, '::1', :ipv6], + ) + test 'time_format' do |data| + conf, bind, protocol = data + omit "IPv6 is not supported on this environment" if protocol == :ipv6 && !ipv6_enabled? - configs.each_pair { |k, v| - d = create_driver(v) + d = create_driver(conf) - tests = [ - {'msg' => '[Sep 11 00:00:00] localhost logger: foo', 'expected' => Fluent::EventTime.from_time(Time.strptime('Sep 11 00:00:00', '%b %d %H:%M:%S'))}, - {'msg' => '[Sep 1 00:00:00] localhost logger: foo', 'expected' => Fluent::EventTime.from_time(Time.strptime('Sep 1 00:00:00', '%b %d %H:%M:%S'))}, - ] + tests = [ + {'msg' => '[Sep 11 00:00:00] localhost logger: foo', 'expected' => event_time('Sep 11 00:00:00', format: '%b %d %H:%M:%S')}, + {'msg' => '[Sep 1 00:00:00] localhost logger: foo', 'expected' => event_time('Sep 1 00:00:00', format: '%b %d %H:%M:%S')}, + ] - d.run do - u = Fluent::SocketUtil.create_udp_socket(k) - u.connect(k, PORT) - tests.each {|test| + d.run(expect_records: 2) do + create_udp_socket(bind, PORT) do |u| + tests.each do |test| u.send(test['msg'], 0) - } - u.close - sleep 1 + end end + end - emits = d.emits - emits.each_index {|i| - assert_equal_event_time(tests[i]['expected'], emits[i][1]) - } - } - + events = d.events + tests.each_with_index do |t, i| + assert_equal_event_time(t['expected'], events[i][1]) + end end - { - 'none' => [ - {'msg' => "tcptest1\n", 'expected' => 'tcptest1'}, - {'msg' => "tcptest2\n", 'expected' => 'tcptest2'}, - ], - 'json' => [ - {'msg' => {'k' => 123, 'message' => 'tcptest1'}.to_json + "\n", 'expected' => 'tcptest1'}, - {'msg' => {'k' => 'tcptest2', 'message' => 456}.to_json + "\n", 'expected' => 456}, - ], - '/^\\[(?