From a01883a2762042ac2efa6d82f697c2b3a1b7ee73 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Tue, 15 Nov 2016 17:28:26 +0900 Subject: [PATCH 01/29] make sure to mark as loop not running --- lib/fluent/plugin_helper/event_loop.rb | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/lib/fluent/plugin_helper/event_loop.rb b/lib/fluent/plugin_helper/event_loop.rb index eb45f5c5f9..909afdb294 100644 --- a/lib/fluent/plugin_helper/event_loop.rb +++ b/lib/fluent/plugin_helper/event_loop.rb @@ -65,11 +65,14 @@ def start # event loop does not run here, so mutex lock is not required thread_create :event_loop do - default_watcher = DefaultWatcher.new - @_event_loop.attach(default_watcher) - @_event_loop_running = true - @_event_loop.run(@_event_loop_run_timeout) # this method blocks - @_event_loop_running = false + begin + default_watcher = DefaultWatcher.new + @_event_loop.attach(default_watcher) + @_event_loop_running = true + @_event_loop.run(@_event_loop_run_timeout) # this method blocks + ensure + @_event_loop_running = false + end end end From 6874665d73f4258211d93269d96466356c248215 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Wed, 16 Nov 2016 18:23:42 +0900 Subject: [PATCH 02/29] add server plugin helper --- lib/fluent/plugin_helper.rb | 2 + lib/fluent/plugin_helper/server.rb | 411 +++++++++++++++++++++++++++++ lib/fluent/test/driver/base.rb | 20 ++ 3 files changed, 433 insertions(+) create mode 100644 lib/fluent/plugin_helper/server.rb diff --git a/lib/fluent/plugin_helper.rb b/lib/fluent/plugin_helper.rb index 8977f64913..d37519a9cb 100644 --- a/lib/fluent/plugin_helper.rb +++ b/lib/fluent/plugin_helper.rb @@ -24,6 +24,8 @@ require 'fluent/plugin_helper/formatter' require 'fluent/plugin_helper/inject' require 'fluent/plugin_helper/extract' +# require 'fluent/plugin_helper/socket' +require 'fluent/plugin_helper/server' require 'fluent/plugin_helper/retry_state' require 'fluent/plugin_helper/compat_parameters' diff --git a/lib/fluent/plugin_helper/server.rb b/lib/fluent/plugin_helper/server.rb new file mode 100644 index 0000000000..8b4660e1b6 --- /dev/null +++ b/lib/fluent/plugin_helper/server.rb @@ -0,0 +1,411 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin_helper/event_loop' + +require 'serverengine/socket_manager' +require 'cool.io' +require 'socket' +require 'ipaddr' +require 'fcntl' + +module Fluent + module PluginHelper + module Server + include Fluent::PluginHelper::EventLoop + + # This plugin helper doesn't support these things for now: + # * SSL/TLS (TBD) + # * IPv6 + # * TCP/TLS keepalive + + # stop : [-] + # shutdown : detach server event handler from event loop (event_loop) + # close : close listening sockets + # terminate: remote all server instances + + attr_reader :_servers # for tests + + def server_wait_until_start + # event_loop_wait_until_start works well for this + end + + def server_wait_until_stop + sleep 0.1 while @_servers.any?{|si| si.server.attached? } + @_servers.each{|si| si.server.close rescue nil } + end + + PROTOCOLS = [:tcp, :udp, :tls, :unix] + CONNECTION_PROTOCOLS = [:tcp, :tls, :unix] + + # server_create_connection(:title, @port) do |conn| + # # on connection + # source_addr = conn.remote_host + # source_port = conn.remote_port + # conn.data do |data| + # # on data + # conn.write resp # ... + # conn.close + # end + # end + def server_create_connection(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, certopts: nil, resolve_name: false, linger_timeout: 0, backlog: nil, &block) + raise ArgumentError, "BUG: title must be a symbol" unless title.is_a? Symbol + raise ArgumentError, "BUG: cannot create connection for UDP" unless CONNECTION_PROTOCOLS.include?(proto) + raise ArgumentError, "BUG: block not specified which handles connection" unless block_given? + raise ArgumentError, "BUG: block must have just one argument" unless block.arity == 1 + + case proto + when :tcp + server = server_create_for_tcp_connection(shared, bind, port, resolve_name, linger_timeout, backlog, &block) + when :tls + raise ArgumentError, "BUG: certopts (certificate options) not specified for TLS" unless certopts + # server_certopts_validate!(certopts) + # sock = server_create_tls_socket(shared, bind, port) + # server = nil # ... + raise "not implemented yet" + when :unix + raise "not implemented yet" + else + raise "unknown protocol #{proto}" + end + + server_attach(title, port, bind, shared, server) + end + + # server_create(:title, @port) do |data| + # # ... + # end + # server_create(:title, @port) do |data, conn| + # # ... + # end + # server_create(:title, @port, proto: :udp, max_bytes: 2048) do |data, sock| + # sock.remote_host + # sock.remote_port + # # ... + # end + def server_create(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, certopts: nil, resolve_name: false, linger_timeout: 0, backlog: nil, max_bytes: nil, flags: 0, &callback) + raise ArgumentError, "BUG: title must be a symbol" unless title.is_a? Symbol + raise ArgumentError, "BUG: invalid protocol name" unless PROTOCOLS.include?(proto) + + raise ArgumentError, "BUG: block not specified which handles received data" unless block_given? + raise ArgumentError, "BUG: block must have 1 or 2 arguments" unless callback.arity == 1 || callback.arity == 2 + + case proto + when :tcp + server = server_create_for_tcp_connection(shared, bind, port, resolve_name, linger_timeout, backlog) do |conn| + conn.data(&callback) + end + when :tls + raise ArgumentError, "BUG: certopts (certificate options) not specified for TLS" unless certopts + server_certopts_validate!(certopts) + raise "not implemented yet" + when :udp + raise ArgumentError, "BUG: max_bytes must be specified for UDP" unless max_bytes + sock = server_create_udp_socket(shared, bind, port) + sock.do_not_reverse_lookup = !resolve_name + server = EventHandler::UDPServer.new(sock, resolve_name, max_bytes, flags, @log, @under_plugin_development, &callback) + when :unix + raise "not implemented yet" + else + raise "BUG: unknown protocol #{proto}" + end + + server_attach(title, port, bind, shared, server) + end + + def server_create_tcp(title, port, **kwargs, &callback) + server_create(title, port, proto: :tcp, **kwargs, &callback) + end + + def server_create_udp(title, port, **kwargs, &callback) + server_create(title, port, proto: :udp, **kwargs, &callback) + end + + ServerInfo = Struct.new(:title, :port, :bind, :shared, :server) + + def server_attach(title, port, bind, shared, server) + @_servers << ServerInfo.new(title, port, bind, shared, server) + event_loop_attach(server) + end + + def server_create_for_tcp_connection(shared, bind, port, resolve_name, linger_timeout, backlog, &block) + sock = server_create_tcp_socket(shared, bind, port) + server = Coolio::TCPServer.new(sock, nil, EventHandler::TCPServer, resolve_name, linger_timeout, @log, @under_plugin_development, block) + server.listen(backlog) if backlog + server + end + + def initialize + super + @_servers = [] + end + + def close + @_servers.each do |si| + si.server.close rescue nil + end + + super + end + + def terminate + @_servers = [] + super + end + + def server_certopts_validate!(certopts) + raise "not implemented yet" + end + + def server_socket_manager_client + socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH'] + if Fluent.windows? + socket_manager_path = socket_manager_path.to_i + end + ServerEngine::SocketManager::Client.new(socket_manager_path) + end + + def server_create_tcp_socket(shared, bind, port) + sock = if shared + server_socket_manager_client.listen_tcp(bind, port) + else + TCPServer.new(bind, port) # this method call can create sockets for AF_INET6 + end + sock.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) # close-on-exec + sock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # nonblock + sock + end + + def server_create_udp_socket(shared, bind, port) + sock = if shared + server_socket_manager_client.listen_udp(bind, port) + else + family = IPAddr.new(IPSocket.getaddress(bind)).ipv4? ? ::Socket::AF_INET : ::Socket::AF_INET6 + usock = UDPSocket.new(family) + usock.bind(bind, port) + usock + end + sock.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) # close-on-exec + sock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # nonblock + sock + end + + def server_create_tls_socket(shared, bind, port) + raise "not implemented yet" + end + + class CallbackSocket + def initialize(server_type, sock, enabled_events = []) + @server_type = server_type + @sock = sock + @enabled_events = enabled_events + end + + def remote_addr + @sock.peeraddr[3] + end + + def remote_host + @sock.peeraddr[2] + end + + def remote_port + @sock.peeraddr[1] + end + + def send(data, flag) + @sock.send(data, flag) + end + + def write(data) + @sock.write(data) + end + + def close + @sock.close + end + + def data(&callback) + on(:data, &callback) + end + + def on(event, &callback) + raise "BUG: this event is disabled for #{@server_type}" unless @enabled_events.include?(event) + case event + when :data + @sock.data(&callback) + when :write_complete + @sock.on_write_complete(&callback) + when :before_close + @sock.before_close(&callback) + when :close + @sock.on_close(&callback) + else + raise "BUG: unknown event: #{event}" + end + end + end + + class TCPCallbackSocket < CallbackSocket + def initialize(sock) + super("tcp", sock, [:data, :write_complete, :before_close, :close]) + end + end + + class UDPCallbackSocket < CallbackSocket + def initialize(sock) + super("udp", sock, [:write_complete]) + end + + def write(data) + self.send(data, 0) + end + end + + module EventHandler + class UDPServer < Coolio::IO + def initialize(sock, resolve_name, max_bytes, flags, log, under_plugin_development, &callback) + raise ArgumentError, "socket must be a UDPSocket: sock = #{sock}" unless sock.is_a?(UDPSocket) + + super(sock) + + @sock = sock + @resolve_name = resolve_name + @max_bytes = max_bytes + @flags = flags + @log = log + @under_plugin_development = under_plugin_development + @callback = callback + + @sock.do_not_reverse_lookup = !resolve_name + on_readable_impl = case @callback.arity + when 1 then :on_readable_without_sock + when 2 then :on_readable_with_sock + else + raise "BUG: callback block must have 1 or 2 arguments" + end + self.define_singleton_method(:on_readable, method(on_readable_impl)) + end + + def on_readable_without_sock + begin + data = @sock.recv(@max_bytes, @flags) + rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR + return + end + @callback.call(data) + rescue => e + @log.error "unexpected error in processing UDP data", error: e + @log.error_backtrace + raise if @under_plugin_development + end + + def on_readable_with_sock + begin + data, addr = @sock.recvfrom(@max_bytes) + rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR + return + end + sock = UDPSocket.new(addr[0]) # Address family: "AF_INET", "AF_INET6" + sock.do_not_reverse_lookup = !@resolve_name + sock.connect(addr[3], addr[1]) + @callback.call(data, UDPCallbackSocket.new(sock)) + rescue => e + @log.error "unexpected error in processing UDP data", error: e + @log.error_backtrace + raise if @under_plugin_development + end + end + + class TCPServer < Coolio::TCPSocket + SOCK_OPT_FORMAT = 'I!I!' # { int l_onoff; int l_linger; } + + def initialize(sock, resolve_name, linger_timeout, log, under_plugin_development, connect_callback) + raise ArgumentError, "socket must be a TCPSocket" unless sock.is_a?(TCPSocket) + + super(sock) + + sock_opt = [1, linger_timeout].pack(SOCK_OPT_FORMAT) + sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_LINGER, sock_opt) + + @resolve_name = resolve_name + @log = log + @connect_callback = connect_callback + + @under_plugin_development = under_plugin_development + + @data_callback = nil + @closing = false + @mutex = Mutex.new # to serialize #write and #close + end + + def data(&callback) + @data_callback = callback + on_read_impl = case callback.arity + when 1 then :on_read_without_connection + when 2 then :on_read_with_connection + else + raise "BUG: callback block must have 1 or 2 arguments" + end + self.define_singleton_method(:on_read, method(on_read_impl)) + end + + def write(data) + raise IOError, "server TCP connection is already going to be closed" if @closing + @mutex.synchronize do + super + end + end + + def on_connect + conn = TCPCallbackSocket.new(self) + @connect_callback.call(conn) + unless @data_callback + raise "connection callback must call #data to set data callback" + end + end + + def on_read_without_connection(data) + @data_callback.call(data) + rescue => e + @log.error "unexpected error on reading data", host: remote_host, port: remote_port, error: e + @log.error_backtrace + close(true) rescue nil + raise if @under_plugin_development + end + + def on_read_with_connection(data) + @data_callback.call(data, self) + rescue => e + @log.error "unexpected error on reading data", host: remote_host, port: remote_port, error: e + @log.error_backtrace + close(true) rescue nil + raise if @under_plugin_development + end + + def close(force = false) + @closing = true + if force + super() + else + @mutex.synchronize{ super() } + end + end + end + end + end + end +end diff --git a/lib/fluent/test/driver/base.rb b/lib/fluent/test/driver/base.rb index 6b7d5af237..1a5fbe39c3 100644 --- a/lib/fluent/test/driver/base.rb +++ b/lib/fluent/test/driver/base.rb @@ -18,6 +18,8 @@ require 'fluent/config/element' require 'fluent/log' +require 'serverengine/socket_manager' +require 'fileutils' require 'timeout' module Fluent @@ -46,6 +48,8 @@ def initialize(klass, opts: {}, &block) end @instance.under_plugin_development = true + @socket_manager_server = nil + @logs = [] @test_clock_id = Process::CLOCK_MONOTONIC_RAW rescue Process::CLOCK_MONOTONIC @@ -101,6 +105,13 @@ def run(timeout: nil, start: true, shutdown: true, &block) end def instance_start + if @instance.respond_to?(:server_wait_until_start) + @socket_manager_path = ServerEngine::SocketManager::Server.generate_path + FileUtils.rm_f @socket_manager_path if File.exist?(@socket_manager_path) + @socket_manager_server = ServerEngine::SocketManager::Server.open(@socket_manager_path) + ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = @socket_manager_path.to_s + end + unless @instance.started? @instance.start end @@ -145,7 +156,16 @@ def instance_shutdown @instance.thread_wait_until_stop end + if @instance.respond_to?(:server_wait_until_stop) + @instance.server_wait_until_stop + end + @instance.terminate unless @instance.terminated? + + if @socket_manager_server + @socket_manager_server.close + FileUtils.rm_f @socket_manager_path if File.exist?(@socket_manager_path) + end end def run_actual(timeout: DEFAULT_TIMEOUT, &block) From 1e7b78fa7035084be7756f6694f49ce8bcc180be Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Wed, 16 Nov 2016 18:24:22 +0900 Subject: [PATCH 03/29] migrate plugins to v0.14 APIs with server plugin helper --- lib/fluent/plugin/in_tcp.rb | 66 ++++++++++--- lib/fluent/plugin/in_udp.rb | 60 +++++++++--- test/plugin/test_in_tcp.rb | 140 ++++++++++++++++------------ test/plugin/test_in_udp.rb | 181 ++++++++++++++++++++---------------- 4 files changed, 278 insertions(+), 169 deletions(-) diff --git a/lib/fluent/plugin/in_tcp.rb b/lib/fluent/plugin/in_tcp.rb index addfc604c4..37dc5a10ba 100644 --- a/lib/fluent/plugin/in_tcp.rb +++ b/lib/fluent/plugin/in_tcp.rb @@ -14,28 +14,66 @@ # limitations under the License. # -require 'cool.io' +require 'fluent/plugin/input' -require 'fluent/plugin/socket_util' +module Fluent::Plugin + class TcpInput < Input + Fluent::Plugin.register_input('tcp', self) -module Fluent - class TcpInput < SocketUtil::BaseInput - Plugin.register_input('tcp', self) + helpers :server, :parser, :extract, :compat_parameters + + desc 'Tag of output events.' + config_param :tag, :string + desc 'The port to listen to.' + config_param :port, :integer, default: 5170 + desc 'The bind address to listen to.' + config_param :bind, :string, default: '0.0.0.0' + + desc "The field name of the client's hostname." + config_param :source_host_key, :string, default: nil, deprecated: "use source_hostname_key instead." + desc "The field name of the client's hostname." + config_param :source_hostname_key, :string, default: nil + + config_param :blocking_timeout, :time, default: 0.5 - config_set_default :port, 5170 desc 'The payload is read up to this character.' config_param :delimiter, :string, default: "\n" # syslog family add "\n" to each message and this seems only way to split messages in tcp stream - def listen(callback) - log.info "listening tcp socket on #{@bind}:#{@port}" + def configure(conf) + compat_parameters_convert(conf, :parser) + super + @_event_loop_blocking_timeout = @blocking_timeout + @parser = parser_create + end + + def start + super + + @buffer = '' + server_create(:in_tcp_server, @port, proto: :tcp, bind: @bind) do |data, conn| + @buffer << data + begin + pos = 0 + while i = @buffer.index(@delimiter, pos) + msg = @buffer[pos...i] + pos = i + @delimiter.length + + @parser.parse(msg) do |time, record| + unless time && record + log.warn "pattern not match", message: msg + next + end - socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH'] - if Fluent.windows? - socket_manager_path = socket_manager_path.to_i + tag = extract_tag_from_record(record) + tag ||= @tag + time ||= extract_time_from_record(record) || Fluent::EventTime.now + record[@source_host_key] = conn.remote_host if @source_host_key + router.emit(tag, time, record) + end + end + @buffer.slice!(0, pos) if pos > 0 + end end - client = ServerEngine::SocketManager::Client.new(socket_manager_path) - lsock = client.listen_tcp(@bind, @port) - Coolio::TCPServer.new(lsock, nil, SocketUtil::TcpHandler, log, @delimiter, callback) end end end diff --git a/lib/fluent/plugin/in_udp.rb b/lib/fluent/plugin/in_udp.rb index 14f08fc574..d4718a8c1d 100644 --- a/lib/fluent/plugin/in_udp.rb +++ b/lib/fluent/plugin/in_udp.rb @@ -14,24 +14,58 @@ # limitations under the License. # -require 'fluent/plugin/socket_util' +require 'fluent/plugin/input' -module Fluent - class UdpInput < SocketUtil::BaseInput - Plugin.register_input('udp', self) +module Fluent::Plugin + class UdpInput < Input + Fluent::Plugin.register_input('udp', self) + + helpers :server, :parser, :extract, :compat_parameters + + desc 'Tag of output events.' + config_param :tag, :string + desc 'The port to listen to.' + config_param :port, :integer, default: 5160 + desc 'The bind address to listen to.' + config_param :bind, :string, default: '0.0.0.0' + + desc "The field name of the client's hostname." + config_param :source_host_key, :string, default: nil, deprecated: "use source_hostname_key instead." + desc "The field name of the client's hostname." + config_param :source_hostname_key, :string, default: nil - config_set_default :port, 5160 config_param :body_size_limit, :size, default: 4096 - def listen(callback) - log.info "listening udp socket on #{@bind}:#{@port}" - socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH'] - if Fluent.windows? - socket_manager_path = socket_manager_path.to_i + config_param :blocking_timeout, :time, default: 0.5 + + def configure(conf) + compat_parameters_convert(conf, :parser) + super + @_event_loop_blocking_timeout = @blocking_timeout + @parser = parser_create + end + + def start + super + + log.info "listening udp socket", bind: @bind, port: @port + server_create(:in_udp_server, @port, proto: :udp, bind: @bind, max_bytes: @body_size_limit) do |data, sock| + data.chomp! + begin + @parser.parse(data) do |time, record| + unless time && record + log.warn "pattern not match", data: data + next + end + + tag = extract_tag_from_record(record) + tag ||= @tag + time ||= extract_time_from_record(record) || Fluent::EventTime.now + record[@source_host_key] = sock.remote_host if @source_host_key + router.emit(tag, time, record) + end + end end - client = ServerEngine::SocketManager::Client.new(socket_manager_path) - @usock = client.listen_udp(@bind, @port) - SocketUtil::UdpHandler.new(@usock, log, @body_size_limit, callback) end end end diff --git a/test/plugin/test_in_tcp.rb b/test/plugin/test_in_tcp.rb index c6416a1672..c53b4478cc 100755 --- a/test/plugin/test_in_tcp.rb +++ b/test/plugin/test_in_tcp.rb @@ -1,20 +1,8 @@ require_relative '../helper' -require 'fluent/test' +require 'fluent/test/driver/input' require 'fluent/plugin/in_tcp' class TcpInputTest < Test::Unit::TestCase - class << self - def startup - socket_manager_path = ServerEngine::SocketManager::Server.generate_path - @server = ServerEngine::SocketManager::Server.open(socket_manager_path) - ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s - end - - def shutdown - @server.close - end - end - def setup Fluent::Test.setup end @@ -34,69 +22,97 @@ def setup ] def create_driver(conf) - Fluent::Test::InputTestDriver.new(Fluent::TcpInput).configure(conf) + Fluent::Test::Driver::Input.new(Fluent::Plugin::TcpInput).configure(conf) end - def test_configure - configs = {'127.0.0.1' => CONFIG} - configs.merge!('::1' => IPv6_CONFIG) if ipv6_enabled? + def create_tcp_socket(host, port, &block) + if block_given? + TCPSocket.open(host, port, &block) + else + TCPSocket.open(host, port) + end + end + + + data( + 'ipv4' => [CONFIG, '127.0.0.1', :ipv4], + 'ipv6' => [IPv6_CONFIG, '::1', :ipv6], + ) + test 'configure' do |data| + conf, bind, protocol = data + omit "IPv6 is not supported on this environment" if protocol == :ipv6 && !ipv6_enabled? - configs.each_pair { |k, v| - d = create_driver(v) - assert_equal PORT, d.instance.port - assert_equal k, d.instance.bind - assert_equal "\n", d.instance.delimiter - } + d = create_driver(conf) + assert_equal PORT, d.instance.port + assert_equal bind, d.instance.bind + assert_equal "\n", d.instance.delimiter end - { - 'none' => [ - {'msg' => "tcptest1\n", 'expected' => 'tcptest1'}, - {'msg' => "tcptest2\n", 'expected' => 'tcptest2'}, - ], - 'json' => [ - {'msg' => {'k' => 123, 'message' => 'tcptest1'}.to_json + "\n", 'expected' => 'tcptest1'}, - {'msg' => {'k' => 'tcptest2', 'message' => 456}.to_json + "\n", 'expected' => 456}, - ] - }.each { |format, test_cases| - define_method("test_msg_size_#{format}") do - d = create_driver(BASE_CONFIG + "format #{format}") - tests = test_cases + test_case_data = { + 'none' => { + 'format' => 'none', + 'payloads' => [ "tcptest1\n", "tcptest2\n" ], + 'expecteds' => [ + {'message' => 'tcptest1'}, + {'message' => 'tcptest2'}, + ], + }, + 'json' => { + 'format' => 'json', + 'payloads' => [ + {'k' => 123, 'message' => 'tcptest1'}.to_json + "\n", + {'k' => 'tcptest2', 'message' => 456}.to_json + "\n", + ], + 'expecteds' => [ + {'k' => 123, 'message' => 'tcptest1'}, + {'k' => 'tcptest2', 'message' => 456} + ], + }, + } + + data(test_case_data) + test 'test_msg_size' do |data| + format = data['format'] + payloads = data['payloads'] + expecteds = data['expecteds'] - d.run do - tests.each {|test| - TCPSocket.open('127.0.0.1', PORT) do |s| - s.send(test['msg'], 0) - end - } - sleep 1 + d = create_driver(BASE_CONFIG + "format #{format}") + d.run(expect_records: 2) do + payloads.each do |payload| + create_tcp_socket('127.0.0.1', PORT) do |sock| + sock.send(payload, 0) + end end + end - compare_test_result(d.emits, tests) + assert_equal 2, d.events.size + expecteds.each_with_index do |expected_record, i| + assert_equal "tcp", d.events[i][0] + assert d.events[i][1].is_a?(Fluent::EventTime) + assert_equal expected_record, d.events[i][2] end + end - define_method("test_msg_size_with_same_connection_#{format}") do - d = create_driver(BASE_CONFIG + "format #{format}") - tests = test_cases + data(test_case_data) + test 'test data in a connection' do |data| + format = data['format'] + payloads = data['payloads'] + expecteds = data['expecteds'] - d.run do - TCPSocket.open('127.0.0.1', PORT) do |s| - tests.each {|test| - s.send(test['msg'], 0) - } + d = create_driver(BASE_CONFIG + "format #{format}") + d.run(expect_records: 2) do + create_tcp_socket('127.0.0.1', PORT) do |sock| + payloads.each do |payload| + sock.send(payload, 0) end - sleep 1 end - - compare_test_result(d.emits, tests) end - } - def compare_test_result(emits, tests) - assert_equal(2, emits.size) - emits.each_index {|i| - assert_equal(tests[i]['expected'], emits[i][2]['message']) - assert(emits[i][1].is_a?(Fluent::EventTime)) - } + assert_equal 2, d.events.size + expecteds.each_with_index do |expected_record, i| + assert_equal "tcp", d.events[i][0] + assert d.events[i][1].is_a?(Fluent::EventTime) + assert_equal expected_record, d.events[i][2] + end end end diff --git a/test/plugin/test_in_udp.rb b/test/plugin/test_in_udp.rb index 01c47c1b8a..d74588b77e 100755 --- a/test/plugin/test_in_udp.rb +++ b/test/plugin/test_in_udp.rb @@ -1,20 +1,8 @@ require_relative '../helper' -require 'fluent/test' +require 'fluent/test/driver/input' require 'fluent/plugin/in_udp' class UdpInputTest < Test::Unit::TestCase - class << self - def startup - socket_manager_path = ServerEngine::SocketManager::Server.generate_path - @server = ServerEngine::SocketManager::Server.open(socket_manager_path) - ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s - end - - def shutdown - @server.close - end - end - def setup Fluent::Test.setup end @@ -34,88 +22,121 @@ def setup ! def create_driver(conf) - Fluent::Test::InputTestDriver.new(Fluent::UdpInput).configure(conf) + Fluent::Test::Driver::Input.new(Fluent::Plugin::UdpInput).configure(conf) end - def test_configure - configs = {'127.0.0.1' => CONFIG} - configs.merge!('::1' => IPv6_CONFIG) if ipv6_enabled? + def create_udp_socket(host, port) + u = if IPAddr.new(IPSocket.getaddress(host)).ipv4? + UDPSocket.new(Socket::AF_INET) + else + UDPSocket.new(Socket::AF_INET6) + end + u.connect(host, port) + if block_given? + begin + yield u + ensure + u.close rescue nil + end + else + u + end + end - configs.each_pair { |k, v| - d = create_driver(v) - assert_equal PORT, d.instance.port - assert_equal k, d.instance.bind - assert_equal 4096, d.instance.body_size_limit - } + data( + 'ipv4' => [CONFIG, '127.0.0.1', :ipv4], + 'ipv6' => [IPv6_CONFIG, '::1', :ipv6], + ) + test 'configure' do |data| + conf, bind, protocol = data + omit "IPv6 is not supported on this environment" if protocol == :ipv6 && !ipv6_enabled? + + d = create_driver(conf) + assert_equal PORT, d.instance.port + assert_equal bind, d.instance.bind + assert_equal 4096, d.instance.body_size_limit end - def test_time_format - configs = {'127.0.0.1' => CONFIG} - configs.merge!('::1' => IPv6_CONFIG) if ipv6_enabled? + data( + 'ipv4' => [CONFIG, '127.0.0.1', :ipv4], + 'ipv6' => [IPv6_CONFIG, '::1', :ipv6], + ) + test 'time_format' do |data| + conf, bind, protocol = data + omit "IPv6 is not supported on this environment" if protocol == :ipv6 && !ipv6_enabled? - configs.each_pair { |k, v| - d = create_driver(v) + d = create_driver(conf) - tests = [ - {'msg' => '[Sep 11 00:00:00] localhost logger: foo', 'expected' => Fluent::EventTime.from_time(Time.strptime('Sep 11 00:00:00', '%b %d %H:%M:%S'))}, - {'msg' => '[Sep 1 00:00:00] localhost logger: foo', 'expected' => Fluent::EventTime.from_time(Time.strptime('Sep 1 00:00:00', '%b %d %H:%M:%S'))}, - ] + tests = [ + {'msg' => '[Sep 11 00:00:00] localhost logger: foo', 'expected' => event_time('Sep 11 00:00:00', format: '%b %d %H:%M:%S')}, + {'msg' => '[Sep 1 00:00:00] localhost logger: foo', 'expected' => event_time('Sep 1 00:00:00', format: '%b %d %H:%M:%S')}, + ] - d.run do - u = Fluent::SocketUtil.create_udp_socket(k) - u.connect(k, PORT) - tests.each {|test| + d.run(expect_records: 2) do + create_udp_socket(bind, PORT) do |u| + tests.each do |test| u.send(test['msg'], 0) - } - u.close - sleep 1 + end end + end - emits = d.emits - emits.each_index {|i| - assert_equal_event_time(tests[i]['expected'], emits[i][1]) - } - } - + events = d.events + tests.each_with_index do |t, i| + assert_equal_event_time(t['expected'], events[i][1]) + end end - { - 'none' => [ - {'msg' => "tcptest1\n", 'expected' => 'tcptest1'}, - {'msg' => "tcptest2\n", 'expected' => 'tcptest2'}, - ], - 'json' => [ - {'msg' => {'k' => 123, 'message' => 'tcptest1'}.to_json + "\n", 'expected' => 'tcptest1'}, - {'msg' => {'k' => 'tcptest2', 'message' => 456}.to_json + "\n", 'expected' => 456}, - ], - '/^\\[(?<time>[^\\]]*)\\] (?<message>.*)/' => [ - {'msg' => '[Sep 10 00:00:00] localhost: ' + 'x' * 100 + "\n", 'expected' => 'localhost: ' + 'x' * 100}, - {'msg' => '[Sep 10 00:00:00] localhost: ' + 'x' * 1024 + "\n", 'expected' => 'localhost: ' + 'x' * 1024}, - ] - }.each { |format, test_cases| - define_method("test_msg_size_#{format[0] == '/' ? 'regexp' : format}") do - d = create_driver(BASE_CONFIG + "format #{format}") - tests = test_cases - - d.run do - u = UDPSocket.new - u.connect('127.0.0.1', PORT) - tests.each { |test| - u.send(test['msg'], 0) - } - u.close - sleep 1 + data( + 'none' => { + 'format' => 'none', + 'payloads' => ["tcptest1\n", "tcptest2\n"], + 'expecteds' => [ + {"message" => "tcptest1"}, + {"message" => "tcptest2"}, + ], + }, + 'json' => { + 'format' => 'json', + 'payloads' => [ + {'k' => 123, 'message' => 'tcptest1'}.to_json + "\n", + {'k' => 'tcptest2', 'message' => 456}.to_json + "\n", + ], + 'expecteds' => [ + {'k' => 123, 'message' => 'tcptest1'}, + {'k' => 'tcptest2', 'message' => 456}, + ], + }, + 'regexp' => { + 'format' => '/^\\[(?<time>[^\\]]*)\\] (?<message>.*)/', + 'payloads' => [ + '[Sep 10 00:00:00] localhost: ' + 'x' * 100 + "\n", + '[Sep 10 00:00:00] localhost: ' + 'x' * 1024 + "\n" + ], + 'expecteds' => [ + {"message" => 'localhost: ' + 'x' * 100}, + {"message" => 'localhost: ' + 'x' * 1024}, + ], + }, + ) + test 'message size with format' do |data| + format = data['format'] + payloads = data['payloads'] + expecteds = data['expecteds'] + + d = create_driver(BASE_CONFIG + "format #{format}") + d.run(expect_records: 2) do + create_udp_socket('127.0.0.1', PORT) do |u| + payloads.each do |payload| + u.send(payload, 0) + end end - - compare_test_result(d.emits, tests) end - } - def compare_test_result(emits, tests) - assert_equal(2, emits.size) - emits.each_index {|i| - assert_equal(tests[i]['expected'], emits[i][2]['message']) - assert(emits[i][1].is_a?(Fluent::EventTime)) - } + assert_equal 2, d.events.size + expecteds.each_with_index do |expected_record, i| + assert_equal "udp", d.events[i][0] + assert d.events[i][1].is_a?(Fluent::EventTime) + assert_equal expected_record, d.events[i][2] + end end end From 5c6ffceb632d5c6a538fca00e8c1dfdde895ccb0 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Wed, 16 Nov 2016 18:24:39 +0900 Subject: [PATCH 04/29] update implementation using serever plugin helper --- lib/fluent/plugin/in_forward.rb | 221 ++++++++------------------------ test/plugin/test_in_forward.rb | 88 ++----------- 2 files changed, 65 insertions(+), 244 deletions(-) diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb index d89cef55dc..4cc3323369 100644 --- a/lib/fluent/plugin/in_forward.rb +++ b/lib/fluent/plugin/in_forward.rb @@ -20,10 +20,6 @@ require 'yajl' require 'digest' -require 'fluent/plugin/socket_util' -require 'fcntl' -require 'cool.io' - module Fluent::Plugin class ForwardInput < Input Fluent::Plugin.register_input('forward', self) @@ -31,7 +27,7 @@ class ForwardInput < Input # See the wiki page below for protocol specification # https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1 - helpers :event_loop + helpers :server LISTEN_PORT = 24224 @@ -39,12 +35,15 @@ class ForwardInput < Input config_param :port, :integer, default: LISTEN_PORT desc 'The bind address to listen to.' config_param :bind, :string, default: '0.0.0.0' + config_param :backlog, :integer, default: nil # SO_LINGER 0 to send RST rather than FIN to avoid lots of connections sitting in TIME_WAIT at src desc 'The timeout time used to set linger option.' config_param :linger_timeout, :integer, default: 0 # This option is for Cool.io's loop wait timeout to avoid loop stuck at shutdown. Almost users don't need to change this value. config_param :blocking_timeout, :time, default: 0.5 + desc 'Try to resolve hostname from IP addresses or not.' + config_param :resolve_hostname, :bool, default: nil desc 'Connections will be disconnected right after receiving first message if this value is true.' config_param :deny_keepalive, :bool, default: false @@ -91,6 +90,15 @@ class ForwardInput < Input def configure(conf) super + if @source_hostname_key + # TODO: add test + if @resolve_hostname.nil? + @resolve_hostname = true + elsif !@resolve_hostname # user specifies "false" in config + raise Fluent::ConfigError, "resolve_hostname must be true with source_hostname_key" + end + end + if @security if @security.user_auth && @security.users.empty? raise Fluent::ConfigError, "<user> sections required if user_auth enabled" @@ -131,44 +139,35 @@ def configure(conf) @lsock = @usock = nil end + HEARTBEAT_UDP_PAYLOAD = "\0" + def start super - socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH'] - if Fluent.windows? - socket_manager_path = socket_manager_path.to_i + server_create_connection( + :in_forward_server, @port, + bind: @bind, + shared: false, + resolve_name: @resolve_hostname, + linger_timeout: @linger_timeout, + backlog: @backlog, + &method(:handle_connection) + ) + + server_create(:in_forward_server_udp_heartbeat, @port, shared: false, proto: :udp, bind: @bind, resolve_name: @resolve_hostname, max_bytes: 128) do |data, sock| + log.trace "heartbeat udp data arrived", host: sock.remote_host, port: sock.remote_port, data: data + begin + sock.write HEARTBEAT_UDP_PAYLOAD + rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR + log.trace "error while heartbeat response", host: sock.remote_host, error: e + end end - client = ServerEngine::SocketManager::Client.new(socket_manager_path) - - @lsock = listen(client) - event_loop_attach(@lsock) - - @usock = client.listen_udp(@bind, @port) - @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) - @hbr = HeartbeatRequestHandler.new(@usock, method(:on_heartbeat_request)) - event_loop_attach(@hbr) - end - - def close - @lsock.close if @lsock - @usock.close if @usock - super end - def listen(client) - log.info "listening fluent socket on #{@bind}:#{@port}" - sock = client.listen_tcp(@bind, @port) - s = Coolio::TCPServer.new(sock, nil, Handler, @linger_timeout, log, method(:handle_connection)) - s.listen(@backlog) unless @backlog.nil? - s - end - - private - def handle_connection(conn) send_data = ->(serializer, data){ conn.write serializer.call(data) } - log.trace "connected fluent socket", address: conn.remote_addr, port: conn.remote_port + log.trace "connected fluent socket", addr: conn.remote_addr, port: conn.remote_port state = :established nonce = nil user_auth_salt = nil @@ -182,7 +181,7 @@ def handle_connection(conn) state = :pingpong end - log.trace "accepted fluent socket", address: conn.remote_addr, port: conn.remote_port + log.trace "accepted fluent socket", addr: conn.remote_addr, port: conn.remote_port read_messages(conn) do |msg, chunk_size, serializer| case state @@ -198,15 +197,11 @@ def handle_connection(conn) log.debug "connection established", address: conn.remote_addr, port: conn.remote_port state = :established when :established - options = on_message(msg, chunk_size, conn.remote_addr) + options = on_message(msg, chunk_size, conn.remote_host) if options && r = response(options) - send_data.call(serializer, r) log.trace "sent response to fluent socket", address: conn.remote_addr, response: r - if @deny_keepalive - conn.on_write_complete do - conn.close - end - end + conn.on_write_complete{ conn.close } if @deny_keepalive + send_data.call(serializer, r) else if @deny_keepalive conn.close @@ -222,7 +217,7 @@ def read_messages(conn, &block) feeder = nil serializer = nil bytes = 0 - conn.on_data do |data| + conn.data do |data| # only for first call of callback unless feeder first = data[0] @@ -258,7 +253,7 @@ def response(option) nil end - def on_message(msg, chunk_size, peeraddr) + def on_message(msg, chunk_size, remote_host) if msg.nil? # for future TCP heartbeat_request return @@ -266,7 +261,7 @@ def on_message(msg, chunk_size, peeraddr) # TODO: raise an exception if broken chunk is generated by recoverable situation unless msg.is_a?(Array) - log.warn "incoming chunk is broken:", source: source_message(peeraddr), msg: msg + log.warn "incoming chunk is broken:", host: remote_host, msg: msg return end @@ -274,10 +269,10 @@ def on_message(msg, chunk_size, peeraddr) entries = msg[1] if @chunk_size_limit && (chunk_size > @chunk_size_limit) - log.warn "Input chunk size is larger than 'chunk_size_limit', dropped:", tag: tag, source: source_message(peeraddr), limit: @chunk_size_limit, size: chunk_size + log.warn "Input chunk size is larger than 'chunk_size_limit', dropped:", tag: tag, host: remote_host, limit: @chunk_size_limit, size: chunk_size return elsif @chunk_size_warn_limit && (chunk_size > @chunk_size_warn_limit) - log.warn "Input chunk size is larger than 'chunk_size_warn_limit':", tag: tag, source: source_message(peeraddr), limit: @chunk_size_warn_limit, size: chunk_size + log.warn "Input chunk size is larger than 'chunk_size_warn_limit':", tag: tag, host: remote_host, limit: @chunk_size_warn_limit, size: chunk_size end case entries @@ -287,14 +282,14 @@ def on_message(msg, chunk_size, peeraddr) size = (option && option['size']) || 0 es_class = (option && option['compressed'] == 'gzip') ? Fluent::CompressedMessagePackEventStream : Fluent::MessagePackEventStream es = es_class.new(entries, nil, size.to_i) - es = check_and_skip_invalid_event(tag, es, peeraddr) if @skip_invalid_event - es = add_source_host(es, peeraddr[2]) if @source_hostname_key + es = check_and_skip_invalid_event(tag, es, remote_host) if @skip_invalid_event + es = add_source_host(es, remote_host) if @source_hostname_key router.emit_stream(tag, es) when Array # Forward es = if @skip_invalid_event - check_and_skip_invalid_event(tag, entries, peeraddr) + check_and_skip_invalid_event(tag, entries, remote_host) else es = Fluent::MultiEventStream.new entries.each { |e| @@ -306,7 +301,7 @@ def on_message(msg, chunk_size, peeraddr) } es end - es = add_source_host(es, peeraddr[2]) if @source_hostname_key + es = add_source_host(es, remote_host) if @source_hostname_key router.emit_stream(tag, es) option = msg[2] @@ -315,29 +310,31 @@ def on_message(msg, chunk_size, peeraddr) time = msg[1] record = msg[2] if @skip_invalid_event && invalid_event?(tag, time, record) - log.warn "got invalid event and drop it:", source: source_message(peeraddr), tag: tag, time: time, record: record + log.warn "got invalid event and drop it:", host: remote_host, tag: tag, time: time, record: record return msg[3] # retry never succeeded so return ack and drop incoming event. end return if record.nil? time = Fluent::Engine.now if time.to_i == 0 - record[@source_hostname_key] = peeraddr[2] if @source_hostname_key + record[@source_hostname_key] = remote_host if @source_hostname_key router.emit(tag, time, record) option = msg[3] end # return option for response option + ensure + p(here: "ensure of on_message", error: $!) if $! end def invalid_event?(tag, time, record) !((time.is_a?(Integer) || time.is_a?(::Fluent::EventTime)) && record.is_a?(Hash) && tag.is_a?(String)) end - def check_and_skip_invalid_event(tag, es, peeraddr) + def check_and_skip_invalid_event(tag, es, remote_host) new_es = Fluent::MultiEventStream.new es.each { |time, record| if invalid_event?(tag, time, record) - log.warn "skip invalid event:", source: source_message(peeraddr), tag: tag, time: time, record: record + log.warn "skip invalid event:", host: remote_host, tag: tag, time: time, record: record next end new_es.add(time, record) @@ -354,11 +351,6 @@ def add_source_host(es, host) new_es end - def source_message(peeraddr) - _, port, host, addr = peeraddr - "host: #{host}, addr: #{addr}, port: #{port}" - end - def select_authenticate_users(node, username) if node.nil? || node[:users].empty? @security.users.select{|u| u.username == username} @@ -424,114 +416,5 @@ def generate_pong(auth_result, reason_or_salt, nonce, shared_key) shared_key_digest_hex = Digest::SHA512.new.update(reason_or_salt).update(@security.self_hostname).update(nonce).update(shared_key).hexdigest ['PONG', true, '', @security.self_hostname, shared_key_digest_hex] end - - class Handler < Coolio::Socket - attr_reader :protocol, :remote_port, :remote_addr, :remote_host - - PEERADDR_FAILED = ["?", "?", "name resolusion failed", "?"] - - def initialize(io, linger_timeout, log, on_connect_callback) - super(io) - - @peeraddr = nil - if io.is_a?(TCPSocket) # for unix domain socket support in the future - @peeraddr = (io.peeraddr rescue PEERADDR_FAILED) - opt = [1, linger_timeout].pack('I!I!') # { int l_onoff; int l_linger; } - io.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) - end - - ### TODO: disabling name rev resolv - proto, port, host, addr = ( io.peeraddr rescue PEERADDR_FAILED ) - if addr == '?' - port, addr = *Socket.unpack_sockaddr_in(io.getpeername) rescue nil - end - @protocol = proto - @remote_port = port - @remote_addr = addr - @remote_host = host - @writing = false - @closing = false - @mutex = Mutex.new - - @chunk_counter = 0 - @on_connect_callback = on_connect_callback - @log = log - @log.trace { - begin - remote_port, remote_addr = *Socket.unpack_sockaddr_in(@_io.getpeername) - rescue - remote_port = nil - remote_addr = nil - end - [ "accepted fluent socket", {address: remote_addr, port: remote_port, instance: self.object_id} ] - } - end - - def on_connect - @on_connect_callback.call(self) - end - - # API to register callback for data arrival - def on_data(&callback) - @on_read_callback = callback - end - - def on_read(data) - @on_read_callback.call(data) - rescue => e - @log.error "unexpected error on reading data from client", address: @remote_addr, error: e - @log.error_backtrace - close - end - - def on_write_complete - closing = @mutex.synchronize { - @writing = false - @closing - } - if closing - close - end - end - - def close - writing = @mutex.synchronize { - @closing = true - @writing - } - unless writing - super - end - end - end - - class HeartbeatRequestHandler < Coolio::IO - def initialize(io, callback) - super(io) - @io = io - @callback = callback - end - - def on_readable - begin - msg, addr = @io.recvfrom(1024) - rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR - return - end - host = addr[3] - port = addr[1] - @callback.call(host, port, msg) - rescue - # TODO log? - end - end - - def on_heartbeat_request(host, port, msg) - #log.trace "heartbeat request from #{host}:#{port}" - begin - @usock.send "\0", 0, host, port - rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR - end - end end end diff --git a/test/plugin/test_in_forward.rb b/test/plugin/test_in_forward.rb index 50b9e8ac0c..8d24b8ce21 100644 --- a/test/plugin/test_in_forward.rb +++ b/test/plugin/test_in_forward.rb @@ -59,13 +59,6 @@ def create_driver(conf=CONFIG) end sub_test_case '#configure' do - setup do - Fluent::Test::StartupShutdown.setup - end - teardown do - Fluent::Test::StartupShutdown.teardown - end - test 'simple' do @d = d = create_driver assert_equal PORT, d.instance.port @@ -88,18 +81,7 @@ def create_driver(conf=CONFIG) end end - def connect - TCPSocket.new('127.0.0.1', PORT) - end - sub_test_case 'message' do - setup do - Fluent::Test::StartupShutdown.setup - end - teardown do - Fluent::Test::StartupShutdown.teardown - end - test 'time' do time = event_time("2011-01-02 13:14:15 UTC") begin @@ -221,6 +203,7 @@ def connect d.run(expect_records: records.length, timeout: 20) do records.each {|tag, _time, record| send_data [tag, _time, record].to_json + "\n" + sleep 1 } end @@ -229,13 +212,6 @@ def connect end sub_test_case 'forward' do - setup do - Fluent::Test::StartupShutdown.setup - end - teardown do - Fluent::Test::StartupShutdown.teardown - end - data(tcp: { config: CONFIG, options: { @@ -345,13 +321,6 @@ def connect end sub_test_case 'packed forward' do - setup do - Fluent::Test::StartupShutdown.setup - end - teardown do - Fluent::Test::StartupShutdown.teardown - end - data(tcp: { config: CONFIG, options: { @@ -464,13 +433,6 @@ def connect end sub_test_case 'compressed packed forward' do - setup do - Fluent::Test::StartupShutdown.setup - end - teardown do - Fluent::Test::StartupShutdown.teardown - end - test 'set_compress_to_option' do @d = d = create_driver @@ -528,13 +490,6 @@ def connect end sub_test_case 'warning' do - setup do - Fluent::Test::StartupShutdown.setup - end - teardown do - Fluent::Test::StartupShutdown.teardown - end - test 'send_large_chunk_warning' do @d = d = create_driver(CONFIG + %[ chunk_size_warn_limit 16M @@ -551,7 +506,7 @@ def connect d.run(shutdown: false) do Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj| - d.instance.send(:on_message, obj, chunk.size, PEERADDR) + d.instance.send(:on_message, obj, chunk.size, '127.0.0.1') end end @@ -565,7 +520,7 @@ def connect logs = d.instance.log.logs assert_equal 1, logs.select{|line| line =~ / \[warn\]: Input chunk size is larger than 'chunk_size_warn_limit':/ && - line =~ / tag="test.tag" source="host: 127.0.0.1, addr: 127.0.0.1, port: \d+" limit=16777216 size=16777501/ + line =~ / tag="test.tag" host="127.0.0.1" limit=16777216 size=16777501/ }.size, "large chunk warning is not logged" d.instance_shutdown @@ -583,7 +538,7 @@ def connect d.run(shutdown: false) do Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj| - d.instance.send(:on_message, obj, chunk.size, PEERADDR) + d.instance.send(:on_message, obj, chunk.size, '127.0.0.1') end end @@ -591,7 +546,7 @@ def connect logs = d.instance.log.logs assert_equal 1, logs.select{ |line| line =~ / \[warn\]: Input chunk size is larger than 'chunk_size_warn_limit':/ && - line =~ / tag="test.tag" source="host: 127.0.0.1, addr: 127.0.0.1, port: \d+" limit=16777216 size=16777501/ + line =~ / tag="test.tag" host="127.0.0.1" limit=16777216 size=16777501/ }.size, "large chunk warning is not logged" d.instance_shutdown @@ -613,7 +568,7 @@ def connect # d.run => send_data d.run(shutdown: false) do Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj| - d.instance.send(:on_message, obj, chunk.size, PEERADDR) + d.instance.send(:on_message, obj, chunk.size, '127.0.0.1') end end @@ -625,7 +580,7 @@ def connect logs = d.instance.log.logs assert_equal 1, logs.select{|line| line =~ / \[warn\]: Input chunk size is larger than 'chunk_size_limit', dropped:/ && - line =~ / tag="test.tag" source="host: 127.0.0.1, addr: 127.0.0.1, port: \d+" limit=33554432 size=33554989/ + line =~ / tag="test.tag" host="127.0.0.1" limit=33554432 size=33554989/ }.size, "large chunk warning is not logged" d.instance_shutdown @@ -638,7 +593,7 @@ def connect # d.run => send_data d.run(shutdown: false) do - d.instance.send(:on_message, data, 1000000000, PEERADDR) + d.instance.send(:on_message, data, 1000000000, '127.0.0.1') end # check emitted data @@ -647,7 +602,7 @@ def connect # check log logs = d.instance.log.logs assert_equal 1, logs.select{|line| - line =~ / \[warn\]: incoming chunk is broken: source="host: 127.0.0.1, addr: 127.0.0.1, port: \d+" msg=#{data.inspect}/ + line =~ / \[warn\]: incoming chunk is broken: host="127.0.0.1" msg=#{data.inspect}/ }.size, "should not accept broken chunk" d.instance_shutdown @@ -655,13 +610,6 @@ def connect end sub_test_case 'respond to required ack' do - setup do - Fluent::Test::StartupShutdown.setup - end - teardown do - Fluent::Test::StartupShutdown.teardown - end - data(tcp: { config: CONFIG, options: { @@ -824,13 +772,6 @@ def connect end sub_test_case 'not respond without required ack' do - setup do - Fluent::Test::StartupShutdown.setup - end - teardown do - Fluent::Test::StartupShutdown.teardown - end - data(tcp: { config: CONFIG, options: { @@ -1068,6 +1009,10 @@ def simulate_auth_sequence(io, shared_key=SHARED_KEY, username=USER_NAME, passwo true end + def connect + TCPSocket.new('127.0.0.1', PORT) + end + # Data ordering is not assured: # Records in different sockets are processed on different thread, so its scheduling make effect # on order of emitted records. @@ -1089,13 +1034,6 @@ def send_data(data, try_to_receive_response: false, response_timeout: 5, auth: f end sub_test_case 'source_hostname_key feature' do - setup do - Fluent::Test::StartupShutdown.setup - end - teardown do - Fluent::Test::StartupShutdown.teardown - end - test 'message protocol with source_hostname_key' do execute_test { |events| events.each { |tag, time, record| From 9fb9c3d151842660873853bf0b7971a1adcfc065 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Wed, 16 Nov 2016 19:01:30 +0900 Subject: [PATCH 05/29] fix buggy test code --- test/plugin/test_out_forward.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index b97cd0e494..deb364f1be 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -264,7 +264,7 @@ def test_send_comprssed_message_pack_stream_if_compress_is_gzip end def test_send_to_a_node_supporting_responses - target_input_driver = create_target_input_driver(response_stub: true) + target_input_driver = create_target_input_driver d = create_driver(CONFIG + %[flush_interval 1s]) From cca6e9a32f3eba1d3fa5ec3cd8b0de69fa8b8100 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Wed, 16 Nov 2016 19:39:50 +0900 Subject: [PATCH 06/29] On windows, @socket_manager_path is a port number --- lib/fluent/test/driver/base.rb | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/fluent/test/driver/base.rb b/lib/fluent/test/driver/base.rb index 1a5fbe39c3..5f52af883b 100644 --- a/lib/fluent/test/driver/base.rb +++ b/lib/fluent/test/driver/base.rb @@ -107,7 +107,9 @@ def run(timeout: nil, start: true, shutdown: true, &block) def instance_start if @instance.respond_to?(:server_wait_until_start) @socket_manager_path = ServerEngine::SocketManager::Server.generate_path - FileUtils.rm_f @socket_manager_path if File.exist?(@socket_manager_path) + if @socket_manager_path.is_a?(String) && File.exist?(@socket_manager_path) + FileUtils.rm_f @socket_manager_path + end @socket_manager_server = ServerEngine::SocketManager::Server.open(@socket_manager_path) ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = @socket_manager_path.to_s end @@ -164,7 +166,9 @@ def instance_shutdown if @socket_manager_server @socket_manager_server.close - FileUtils.rm_f @socket_manager_path if File.exist?(@socket_manager_path) + if @socket_manager_server.is_a?(String) && File.exist?(@socket_manager_path) + FileUtils.rm_f @socket_manager_path + end end end From faa31373c41e801efd71ba58a847d4193a03565d Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Wed, 16 Nov 2016 20:19:04 +0900 Subject: [PATCH 07/29] close-on-exec is unavailable on windows --- lib/fluent/plugin_helper/server.rb | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin_helper/server.rb b/lib/fluent/plugin_helper/server.rb index 8b4660e1b6..8d30875295 100644 --- a/lib/fluent/plugin_helper/server.rb +++ b/lib/fluent/plugin_helper/server.rb @@ -184,7 +184,9 @@ def server_create_tcp_socket(shared, bind, port) else TCPServer.new(bind, port) # this method call can create sockets for AF_INET6 end - sock.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) # close-on-exec + unless Fluent.windows? + sock.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) # close-on-exec + end sock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # nonblock sock end @@ -198,7 +200,9 @@ def server_create_udp_socket(shared, bind, port) usock.bind(bind, port) usock end - sock.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) # close-on-exec + unless Fluent.windows? + sock.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) # close-on-exec + end sock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # nonblock sock end From ebc8087b8ccab4ca959baffca5931aa010651084 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Wed, 16 Nov 2016 20:38:22 +0900 Subject: [PATCH 08/29] fix not to set close-on-exec, because it's set by ruby --- lib/fluent/plugin_helper/server.rb | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/lib/fluent/plugin_helper/server.rb b/lib/fluent/plugin_helper/server.rb index 8d30875295..a374b1f205 100644 --- a/lib/fluent/plugin_helper/server.rb +++ b/lib/fluent/plugin_helper/server.rb @@ -184,9 +184,7 @@ def server_create_tcp_socket(shared, bind, port) else TCPServer.new(bind, port) # this method call can create sockets for AF_INET6 end - unless Fluent.windows? - sock.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) # close-on-exec - end + # close-on-exec is set by default in Ruby 2.0 or later (, and it's unavailable on Windows) sock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # nonblock sock end @@ -200,9 +198,7 @@ def server_create_udp_socket(shared, bind, port) usock.bind(bind, port) usock end - unless Fluent.windows? - sock.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) # close-on-exec - end + # close-on-exec is set by default in Ruby 2.0 or later (, and it's unavailable on Windows) sock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # nonblock sock end From 6dca0481b93b10f0e46b71871a87dd438021287c Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Wed, 16 Nov 2016 21:04:00 +0900 Subject: [PATCH 09/29] remove mixin not required anymore --- test/plugin/test_out_forward.rb | 3 --- 1 file changed, 3 deletions(-) diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index deb364f1be..ea7cb26ae8 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -1,6 +1,5 @@ require_relative '../helper' require 'fluent/test/driver/output' -require 'fluent/test/startup_shutdown' require 'fluent/plugin/out_forward' require 'flexmock/test_unit' @@ -8,8 +7,6 @@ require 'fluent/plugin/in_forward' class ForwardOutputTest < Test::Unit::TestCase - extend Fluent::Test::StartupShutdown - def setup Fluent::Test.setup end From 45ac6a6981130879d4a37d19ca5dc9b85c1862c6 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Thu, 17 Nov 2016 17:10:23 +0900 Subject: [PATCH 10/29] set do_not_reverse_lookup to sockets to listen and after accepted --- lib/fluent/plugin_helper/server.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin_helper/server.rb b/lib/fluent/plugin_helper/server.rb index a374b1f205..58114c6f22 100644 --- a/lib/fluent/plugin_helper/server.rb +++ b/lib/fluent/plugin_helper/server.rb @@ -143,6 +143,7 @@ def server_attach(title, port, bind, shared, server) def server_create_for_tcp_connection(shared, bind, port, resolve_name, linger_timeout, backlog, &block) sock = server_create_tcp_socket(shared, bind, port) + sock.do_not_reverse_lookup = !resolve_name server = Coolio::TCPServer.new(sock, nil, EventHandler::TCPServer, resolve_name, linger_timeout, @log, @under_plugin_development, block) server.listen(backlog) if backlog server @@ -340,8 +341,8 @@ def initialize(sock, resolve_name, linger_timeout, log, under_plugin_development sock_opt = [1, linger_timeout].pack(SOCK_OPT_FORMAT) sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_LINGER, sock_opt) + sock.do_not_reverse_lookup = !resolve_name - @resolve_name = resolve_name @log = log @connect_callback = connect_callback From d4d5f3583ede65677f8a349ff5d761929544fcad Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Tue, 22 Nov 2016 10:40:21 +0900 Subject: [PATCH 11/29] return watcher when attached, and detach watchers attached by plugin in shutdown --- lib/fluent/plugin_helper/event_loop.rb | 11 +++++++++-- lib/fluent/plugin_helper/timer.rb | 1 + 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin_helper/event_loop.rb b/lib/fluent/plugin_helper/event_loop.rb index 909afdb294..646a106a39 100644 --- a/lib/fluent/plugin_helper/event_loop.rb +++ b/lib/fluent/plugin_helper/event_loop.rb @@ -36,6 +36,8 @@ module EventLoop def event_loop_attach(watcher) @_event_loop_mutex.synchronize do @_event_loop.attach(watcher) + @_event_loop_attached_watchers << watcher + watcher end end @@ -58,6 +60,7 @@ def initialize @_event_loop_mutex = Mutex.new # plugin MAY configure loop run timeout in #configure @_event_loop_run_timeout = EVENT_LOOP_RUN_DEFAULT_TIMEOUT + @_event_loop_attached_watchers = [] end def start @@ -67,7 +70,7 @@ def start thread_create :event_loop do begin default_watcher = DefaultWatcher.new - @_event_loop.attach(default_watcher) + event_loop_attach(default_watcher) @_event_loop_running = true @_event_loop.run(@_event_loop_run_timeout) # this method blocks ensure @@ -78,7 +81,11 @@ def start def shutdown @_event_loop_mutex.synchronize do - @_event_loop.watchers.each {|w| w.detach if w.attached? } + @_event_loop_attached_watchers.reverse.each do |w| + if w.attached? + w.detach + end + end end while @_event_loop_running sleep 0.1 diff --git a/lib/fluent/plugin_helper/timer.rb b/lib/fluent/plugin_helper/timer.rb index 8a1c62a638..02fac1ced8 100644 --- a/lib/fluent/plugin_helper/timer.rb +++ b/lib/fluent/plugin_helper/timer.rb @@ -36,6 +36,7 @@ def timer_execute(title, interval, repeat: true, &block) timer = TimerWatcher.new(title, interval, repeat, log, checker, &block) @_timers << title event_loop_attach(timer) + timer end def timer_running? From d766ecd4a9d09ad610b8781d55087e8c9fa9f1e5 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Tue, 22 Nov 2016 10:43:14 +0900 Subject: [PATCH 12/29] break #shutdown by timeout if any event watcher blocks infinitly --- lib/fluent/plugin/output.rb | 2 +- lib/fluent/plugin_helper/event_loop.rb | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 504f113d95..8108800687 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -1176,7 +1176,7 @@ def flush_thread_run(state) flush_thread_interval = @buffer_config.flush_thread_interval # If the given clock_id is not supported, Errno::EINVAL is raised. - clock_id = Process::CLOCK_MONOTONIC rescue Process::CLOCK_MONOTONIC_RAW + clock_id = Process::CLOCK_MONOTONIC_RAW rescue Process::CLOCK_MONOTONIC state.next_time = Process.clock_gettime(clock_id) + flush_thread_interval while !self.after_started? && !self.stopped? diff --git a/lib/fluent/plugin_helper/event_loop.rb b/lib/fluent/plugin_helper/event_loop.rb index 646a106a39..5b6618c8a5 100644 --- a/lib/fluent/plugin_helper/event_loop.rb +++ b/lib/fluent/plugin_helper/event_loop.rb @@ -30,6 +30,8 @@ module EventLoop # terminate: initialize internal state EVENT_LOOP_RUN_DEFAULT_TIMEOUT = 0.5 + EVENT_LOOP_SHUTDOWN_TIMEOUT = 5 + EVENT_LOOP_CLOCK_ID = Process::CLOCK_MONOTONIC_RAW rescue Process::CLOCK_MONOTONIC attr_reader :_event_loop # for tests @@ -87,7 +89,13 @@ def shutdown end end end + timeout_at = Process.clock_gettime(EVENT_LOOP_CLOCK_ID) + EVENT_LOOP_SHUTDOWN_TIMEOUT while @_event_loop_running + if Process.clock_gettime(EVENT_LOOP_CLOCK_ID) >= timeout_at + log.warn "event loop does NOT exit until hard timeout." + raise "event loop does NOT exit until hard timeout." if @under_plugin_development + break + end sleep 0.1 end From faa51a8cb8619f9c289af1c6820973e426ff06c1 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Tue, 22 Nov 2016 10:46:14 +0900 Subject: [PATCH 13/29] add argument checks --- lib/fluent/plugin_helper/server.rb | 29 +++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin_helper/server.rb b/lib/fluent/plugin_helper/server.rb index 58114c6f22..053b4af85f 100644 --- a/lib/fluent/plugin_helper/server.rb +++ b/lib/fluent/plugin_helper/server.rb @@ -62,11 +62,21 @@ def server_wait_until_stop # end # end def server_create_connection(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, certopts: nil, resolve_name: false, linger_timeout: 0, backlog: nil, &block) - raise ArgumentError, "BUG: title must be a symbol" unless title.is_a? Symbol + raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol) + raise ArgumentError, "BUG: port must be an integer" unless port && port.is_a?(Integer) + raise ArgumentError, "BUG: invalid protocol name" unless PROTOCOLS.include?(proto) raise ArgumentError, "BUG: cannot create connection for UDP" unless CONNECTION_PROTOCOLS.include?(proto) + raise ArgumentError, "BUG: block not specified which handles connection" unless block_given? raise ArgumentError, "BUG: block must have just one argument" unless block.arity == 1 + if proto != :tls # TLS options + raise ArgumentError, "BUG: certopts is available only for tls" if certopts + end + if proto != :tcp && proto != :tls # TCP/TLS options + raise ArgumentError, "BUG: linger_timeout is available for tcp/tls" if linger_timeout != 0 + end + case proto when :tcp server = server_create_for_tcp_connection(shared, bind, port, resolve_name, linger_timeout, backlog, &block) @@ -97,12 +107,27 @@ def server_create_connection(title, port, proto: :tcp, bind: '0.0.0.0', shared: # # ... # end def server_create(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, certopts: nil, resolve_name: false, linger_timeout: 0, backlog: nil, max_bytes: nil, flags: 0, &callback) - raise ArgumentError, "BUG: title must be a symbol" unless title.is_a? Symbol + raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol) + raise ArgumentError, "BUG: port must be an integer" unless port && port.is_a?(Integer) raise ArgumentError, "BUG: invalid protocol name" unless PROTOCOLS.include?(proto) raise ArgumentError, "BUG: block not specified which handles received data" unless block_given? raise ArgumentError, "BUG: block must have 1 or 2 arguments" unless callback.arity == 1 || callback.arity == 2 + if proto != :tls # TLS options + raise ArgumentError, "BUG: certopts is available only for tls" if certopts + end + if proto != :tcp && proto != :tls # TCP/TLS options + raise ArgumentError, "BUG: linger_timeout is available for tcp/tls" if linger_timeout != 0 + end + if proto != :tcp && proto != :tls && proto != :unix # options to listen/accept connections + raise ArgumentError, "BUG: backlog is available for tcp/tls" if backlog + end + if proto != :udp # UDP options + raise ArgumentError, "BUG: max_bytes is available only for udp" if max_bytes + raise ArgumentError, "BUG: flags is available only for udp" if flags != 0 + end + case proto when :tcp server = server_create_for_tcp_connection(shared, bind, port, resolve_name, linger_timeout, backlog) do |conn| From 48fe5e6b62bab948b919e5ebf2e867d0a33639be Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Tue, 22 Nov 2016 10:48:54 +0900 Subject: [PATCH 14/29] show protocol of created servers in dump --- lib/fluent/plugin_helper/server.rb | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/lib/fluent/plugin_helper/server.rb b/lib/fluent/plugin_helper/server.rb index 053b4af85f..ac48fcc522 100644 --- a/lib/fluent/plugin_helper/server.rb +++ b/lib/fluent/plugin_helper/server.rb @@ -92,7 +92,7 @@ def server_create_connection(title, port, proto: :tcp, bind: '0.0.0.0', shared: raise "unknown protocol #{proto}" end - server_attach(title, port, bind, shared, server) + server_attach(title, proto, port, bind, shared, server) end # server_create(:title, @port) do |data| @@ -148,7 +148,7 @@ def server_create(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, certo raise "BUG: unknown protocol #{proto}" end - server_attach(title, port, bind, shared, server) + server_attach(title, proto, port, bind, shared, server) end def server_create_tcp(title, port, **kwargs, &callback) @@ -159,10 +159,18 @@ def server_create_udp(title, port, **kwargs, &callback) server_create(title, port, proto: :udp, **kwargs, &callback) end - ServerInfo = Struct.new(:title, :port, :bind, :shared, :server) + def server_create_tls(title, port, **kwargs, &callback) + server_create(title, port, proto: :tls, **kwargs, &callback) + end + + def server_create_unix(title, port, **kwargs, &callback) + server_create(title, port, proto: :unix, **kwargs, &callback) + end + + ServerInfo = Struct.new(:title, :proto, :port, :bind, :shared, :server) - def server_attach(title, port, bind, shared, server) - @_servers << ServerInfo.new(title, port, bind, shared, server) + def server_attach(title, proto, port, bind, shared, server) + @_servers << ServerInfo.new(title, proto, port, bind, shared, server) event_loop_attach(server) end From 84d82b7824e8567a9d31c102b5fe20b203b31d62 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Tue, 22 Nov 2016 10:50:41 +0900 Subject: [PATCH 15/29] remove all callbacks for udp server, add established connection handling --- lib/fluent/plugin_helper/server.rb | 119 ++++++++++++++++++++--------- 1 file changed, 84 insertions(+), 35 deletions(-) diff --git a/lib/fluent/plugin_helper/server.rb b/lib/fluent/plugin_helper/server.rb index ac48fcc522..ae82bf4530 100644 --- a/lib/fluent/plugin_helper/server.rb +++ b/lib/fluent/plugin_helper/server.rb @@ -177,7 +177,12 @@ def server_attach(title, proto, port, bind, shared, server) def server_create_for_tcp_connection(shared, bind, port, resolve_name, linger_timeout, backlog, &block) sock = server_create_tcp_socket(shared, bind, port) sock.do_not_reverse_lookup = !resolve_name - server = Coolio::TCPServer.new(sock, nil, EventHandler::TCPServer, resolve_name, linger_timeout, @log, @under_plugin_development, block) + close_callback = ->(conn){ @_server_mutex.synchronize{ @_server_connections.delete(conn) } } + server = Coolio::TCPServer.new(sock, nil, EventHandler::TCPServer, close_callback, resolve_name, linger_timeout, @log, @under_plugin_development, block) do |conn| + @_server_mutex.synchronize do + @_server_connections << conn + end + end server.listen(backlog) if backlog server end @@ -185,16 +190,35 @@ def server_create_for_tcp_connection(shared, bind, port, resolve_name, linger_ti def initialize super @_servers = [] + @_server_connections = [] + @_server_mutex = Mutex.new end - def close - @_servers.each do |si| - si.server.close rescue nil + def shutdown + @_server_connections.each do |conn| + conn.close rescue nil + end + @_server_mutex.synchronize do + @_servers.each do |si| + si.server.detach if si.server.attached? + end end super end + def close + @_server_connections.each do |conn| + conn.close rescue nil + end + @_server_mutex.synchronize do + @_servers.each do |si| + si.server.close rescue nil + end + end + super + end + def terminate @_servers = [] super @@ -260,16 +284,21 @@ def remote_port @sock.peeraddr[1] end - def send(data, flag) - @sock.send(data, flag) + def send(data, flags = 0) + @sock.send(data, flags) end def write(data) - @sock.write(data) + raise "not implemented here" end def close @sock.close + # close cool.io socket in another thread, not to make deadlock + # for flushing @_write_buffer when conn.close is called in callback + # ::Thread.new{ + # @sock.close + # } end def data(&callback) @@ -277,16 +306,18 @@ def data(&callback) end def on(event, &callback) - raise "BUG: this event is disabled for #{@server_type}" unless @enabled_events.include?(event) + raise "BUG: this event is disabled for #{@server_type}: #{event}" unless @enabled_events.include?(event) case event when :data @sock.data(&callback) when :write_complete - @sock.on_write_complete(&callback) - when :before_close - @sock.before_close(&callback) + cb = ->(){ + callback.call(self) + } + @sock.on_write_complete(&cb) when :close - @sock.on_close(&callback) + cb = ->(){ callback.call(self) } + @sock.on_close(&cb) else raise "BUG: unknown event: #{event}" end @@ -295,17 +326,34 @@ def on(event, &callback) class TCPCallbackSocket < CallbackSocket def initialize(sock) - super("tcp", sock, [:data, :write_complete, :before_close, :close]) + super("tcp", sock, [:data, :write_complete, :close]) + end + + def write(data) + @sock.write(data) end end class UDPCallbackSocket < CallbackSocket - def initialize(sock) - super("udp", sock, [:write_complete]) + def initialize(sock, peeraddr) + super("udp", sock, []) + @peeraddr = peeraddr + end + + def remote_addr + @peeraddr[3] + end + + def remote_host + @peeraddr[2] + end + + def remote_port + @peeraddr[1] end def write(data) - self.send(data, 0) + @sock.send(data, 0, @peeraddr[3], @peeraddr[1]) end end @@ -353,10 +401,7 @@ def on_readable_with_sock rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR return end - sock = UDPSocket.new(addr[0]) # Address family: "AF_INET", "AF_INET6" - sock.do_not_reverse_lookup = !@resolve_name - sock.connect(addr[3], addr[1]) - @callback.call(data, UDPCallbackSocket.new(sock)) + @callback.call(data, UDPCallbackSocket.new(@sock, addr)) rescue => e @log.error "unexpected error in processing UDP data", error: e @log.error_backtrace @@ -367,26 +412,31 @@ def on_readable_with_sock class TCPServer < Coolio::TCPSocket SOCK_OPT_FORMAT = 'I!I!' # { int l_onoff; int l_linger; } - def initialize(sock, resolve_name, linger_timeout, log, under_plugin_development, connect_callback) + def initialize(sock, close_callback, resolve_name, linger_timeout, log, under_plugin_development, connect_callback) raise ArgumentError, "socket must be a TCPSocket" unless sock.is_a?(TCPSocket) - super(sock) - sock_opt = [1, linger_timeout].pack(SOCK_OPT_FORMAT) sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_LINGER, sock_opt) sock.do_not_reverse_lookup = !resolve_name - @log = log - @connect_callback = connect_callback + @_handler_socket = sock + super(sock) + @log = log @under_plugin_development = under_plugin_development + @connect_callback = connect_callback @data_callback = nil + @close_callback = close_callback + + @callback_connection = nil @closing = false + @mutex = Mutex.new # to serialize #write and #close end def data(&callback) + raise "data callback can be registered just once, but registered twice" if self.singleton_methods.include?(:on_read) @data_callback = callback on_read_impl = case callback.arity when 1 then :on_read_without_connection @@ -398,15 +448,14 @@ def data(&callback) end def write(data) - raise IOError, "server TCP connection is already going to be closed" if @closing @mutex.synchronize do super end end def on_connect - conn = TCPCallbackSocket.new(self) - @connect_callback.call(conn) + @callback_connection = TCPCallbackSocket.new(self) + @connect_callback.call(@callback_connection) unless @data_callback raise "connection callback must call #data to set data callback" end @@ -422,7 +471,7 @@ def on_read_without_connection(data) end def on_read_with_connection(data) - @data_callback.call(data, self) + @data_callback.call(data, @callback_connection) rescue => e @log.error "unexpected error on reading data", host: remote_host, port: remote_port, error: e @log.error_backtrace @@ -430,12 +479,12 @@ def on_read_with_connection(data) raise if @under_plugin_development end - def close(force = false) - @closing = true - if force - super() - else - @mutex.synchronize{ super() } + def close + @mutex.synchronize do + return if @closing + @closing = true + @close_callback.call(self) + super end end end From 93b5e009f38d82ce4c56390b63dde6caf704532c Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Tue, 22 Nov 2016 10:51:03 +0900 Subject: [PATCH 16/29] add tests of server plugin helper --- test/plugin_helper/test_server.rb | 890 ++++++++++++++++++++++++++++++ 1 file changed, 890 insertions(+) create mode 100644 test/plugin_helper/test_server.rb diff --git a/test/plugin_helper/test_server.rb b/test/plugin_helper/test_server.rb new file mode 100644 index 0000000000..a4bb8a87bc --- /dev/null +++ b/test/plugin_helper/test_server.rb @@ -0,0 +1,890 @@ +require_relative '../helper' +require 'fluent/plugin_helper/server' +require 'fluent/plugin/base' +require 'timeout' + +require 'serverengine' +require 'fileutils' + +class ServerPluginHelperTest < Test::Unit::TestCase + class Dummy < Fluent::Plugin::TestBase + helpers :server + end + + PORT = unused_port + + setup do + @socket_manager_path = ServerEngine::SocketManager::Server.generate_path + if @socket_manager_path.is_a?(String) && File.exist?(@socket_manager_path) + FileUtils.rm_f @socket_manager_path + end + @socket_manager_server = ServerEngine::SocketManager::Server.open(@socket_manager_path) + ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = @socket_manager_path.to_s + + @d = Dummy.new + @d.start + @d.after_start + end + + teardown do + @d.stopped? || @d.stop + @d.before_shutdown? || @d.before_shutdown + @d.shutdown? || @d.shutdown + @d.after_shutdown? || @d.after_shutdown + @d.closed? || @d.close + @d.terminated? || @d.terminate + + @socket_manager_server.close + if @socket_manager_server.is_a?(String) && File.exist?(@socket_manager_path) + FileUtils.rm_f @socket_manager_path + end + end + + sub_test_case 'plugin instance' do + test 'can be instantiated to be able to create threads' do + d = Dummy.new + assert d.respond_to?(:_servers) + assert d._servers.empty? + + assert d.respond_to?(:server_wait_until_start) + assert d.respond_to?(:server_wait_until_stop) + assert d.respond_to?(:server_create_connection) + assert d.respond_to?(:server_create) + assert d.respond_to?(:server_create_tcp) + assert d.respond_to?(:server_create_udp) + assert d.respond_to?(:server_create_tls) + end + + test 'can be configured' do + d = Dummy.new + assert_nothing_raised do + d.configure(config_element()) + end + assert d.plugin_id + assert d.log + end + end + + # run tests for tcp, udp, tls and unix + sub_test_case '#server_create and #server_create_connection' do + methods = {server_create: :server_create, server_create_connection: :server_create_connection} + + data(methods) + test 'raise error if title is not specified or not a symbol' do |m| + assert_raise(ArgumentError.new("BUG: title must be a symbol")) do + @d.__send__(m, nil, PORT){|x| x } + end + assert_raise(ArgumentError.new("BUG: title must be a symbol")) do + @d.__send__(m, "", PORT){|x| x } + end + assert_raise(ArgumentError.new("BUG: title must be a symbol")) do + @d.__send__(m, "title", PORT){|x| x } + end + assert_nothing_raised do + @d.__send__(m, :myserver, PORT){|x| x } + end + end + + data(methods) + test 'raise error if port is not specified or not an integer' do |m| + assert_raise(ArgumentError.new("BUG: port must be an integer")) do + @d.__send__(m, :myserver, nil){|x| x } + end + assert_raise(ArgumentError.new("BUG: port must be an integer")) do + @d.__send__(m, :myserver, "1"){|x| x } + end + assert_raise(ArgumentError.new("BUG: port must be an integer")) do + @d.__send__(m, :myserver, 1.5){|x| x } + end + assert_nothing_raised do + @d.__send__(m, :myserver, PORT){|x| x } + end + end + + data(methods) + test 'raise error if block is not specified' do |m| + assert_raise(ArgumentError) do + @d.__send__(m, :myserver, PORT) + end + assert_nothing_raised do + @d.__send__(m, :myserver, PORT){|x| x } + end + end + + data(methods) + test 'creates tcp server, binds 0.0.0.0 in default' do |m| + @d.__send__(m, :myserver, PORT){|x| x } + + assert_equal 1, @d._servers.size + assert_equal :myserver, @d._servers.first.title + assert_equal PORT, @d._servers.first.port + + assert_equal :tcp, @d._servers.first.proto + assert_equal "0.0.0.0", @d._servers.first.bind + + assert{ @d._servers.first.server.is_a? Coolio::TCPServer } + assert_equal "0.0.0.0", @d._servers.first.server.instance_eval{ @listen_socket }.addr[3] + end + + data(methods) + test 'creates tcp server if specified in proto' do |m| + @d.__send__(m, :myserver, PORT, proto: :tcp){|x| x } + + assert_equal :tcp, @d._servers.first.proto + assert{ @d._servers.first.server.is_a? Coolio::TCPServer } + end + + # tests about "proto: :udp" is in #server_create + + data(methods) + test 'creates tls server if specified in proto' do |m| + # pend "not implemented yet" + end + + data(methods) + test 'creates unix server if specified in proto' do |m| + # pend "not implemented yet" + end + + data(methods) + test 'raise error if unknown protocol specified' do |m| + assert_raise(ArgumentError.new("BUG: invalid protocol name")) do + @d.__send__(m, :myserver, PORT, proto: :quic){|x| x } + end + end + + data( + 'server_create tcp' => [:server_create, :tcp], + # 'server_create tls' => [:server_create, :tls], + # 'server_create unix' => [:server_create, :unix], + 'server_create_connection tcp' => [:server_create_connection, :tcp], + # 'server_create_connection tcp' => [:server_create_connection, :tls], + # 'server_create_connection tcp' => [:server_create_connection, :unix], + ) + test 'raise error if udp options specified for tcp/tls/unix' do |(m, proto)| + assert_raise ArgumentError do + @d.__send__(m, :myserver, PORT, proto: proto, max_bytes: 128){|x| x } + end + assert_raise ArgumentError do + @d.__send__(m, :myserver, PORT, proto: proto, flags: 1){|x| x } + end + end + + data( + 'server_create udp' => [:server_create, :udp], + ) + test 'raise error if tcp/tls options specified for udp' do |(m, proto)| + assert_raise(ArgumentError.new("BUG: linger_timeout is available for tcp/tls")) do + @d.__send__(m, :myserver, PORT, proto: proto, linger_timeout: 1, max_bytes: 128){|x| x } + end + end + + data( + 'server_create udp' => [:server_create, :udp], + ) + test 'raise error if tcp/tls/unix options specified for udp' do |(m, proto)| + assert_raise(ArgumentError.new("BUG: backlog is available for tcp/tls")) do + @d.__send__(m, :myserver, PORT, proto: proto, backlog: 500){|x| x } + end + end + + data( + 'server_create tcp' => [:server_create, :tcp, {}], + 'server_create udp' => [:server_create, :udp, {max_bytes: 128}], + # 'server_create unix' => [:server_create, :unix, {}], + 'server_create_connection tcp' => [:server_create_connection, :tcp, {}], + # 'server_create_connection unix' => [:server_create_connection, :unix, {}], + ) + test 'raise error if tls options specified for tcp/udp/unix' do |(m, proto, kwargs)| + assert_raise(ArgumentError.new("BUG: certopts is available only for tls")) do + @d.__send__(m, :myserver, PORT, proto: proto, certopts: {}, **kwargs){|x| x } + end + end + + data( + 'server_create tcp' => [:server_create, :tcp, {}], + 'server_create udp' => [:server_create, :udp, {max_bytes: 128}], + # 'server_create tls' => [:server_create, :tls, {}], + 'server_create_connection tcp' => [:server_create_connection, :tcp, {}], + # 'server_create_connection tls' => [:server_create_connection, :tls, {}], + ) + test 'can bind specified IPv4 address' do |(m, proto, kwargs)| + @d.__send__(m, :myserver, PORT, proto: proto, bind: "127.0.0.1", **kwargs){|x| x } + assert_equal "127.0.0.1", @d._servers.first.bind + assert_equal "127.0.0.1", @d._servers.first.server.instance_eval{ instance_variable_defined?(:@listen_socket) ? @listen_socket : @_io }.addr[3] + end + + data( + 'server_create tcp' => [:server_create, :tcp, {}], + 'server_create udp' => [:server_create, :udp, {max_bytes: 128}], + # 'server_create tls' => [:server_create, :tls, {}], + 'server_create_connection tcp' => [:server_create_connection, :tcp, {}], + # 'server_create_connection tls' => [:server_create_connection, :tls, {}], + ) + test 'can bind specified IPv6 address' do |(m, proto, kwargs)| # if available + omit "IPv6 unavailable here" unless ipv6_enabled? + @d.__send__(m, :myserver, PORT, proto: proto, bind: "::1", **kwargs){|x| x } + assert_equal "::1", @d._servers.first.bind + assert_equal "::1", @d._servers.first.server.instance_eval{ instance_variable_defined?(:@listen_socket) ? @listen_socket : @_io }.addr[3] + end + + data( + 'server_create tcp' => [:server_create, :tcp, {}], + 'server_create udp' => [:server_create, :udp, {max_bytes: 128}], + # 'server_create tls' => [:server_create, :tls, {}], + # 'server_create unix' => [:server_create, :unix, {}], + 'server_create_connection tcp' => [:server_create, :tcp, {}], + # 'server_create_connection tls' => [:server_create, :tls, {}], + # 'server_create_connection unix' => [:server_create, :unix, {}], + ) + test 'can create 2 or more servers which share same bind address and port if shared option is true' do |(m, proto, kwargs)| + begin + d2 = Dummy.new; d2.start; d2.after_start + + assert_nothing_raised do + @d.__send__(m, :myserver, PORT, proto: proto, **kwargs){|x| x } + d2.__send__(m, :myserver, PORT, proto: proto, **kwargs){|x| x } + end + ensure + d2.stop; d2.before_shutdown; d2.shutdown; d2.after_shutdown; d2.close; d2.terminate + end + end + + data( + 'server_create tcp' => [:server_create, :tcp, {}], + 'server_create udp' => [:server_create, :udp, {max_bytes: 128}], + # 'server_create tls' => [:server_create, :tls, {}], + # 'server_create unix' => [:server_create, :unix, {}], + 'server_create_connection tcp' => [:server_create, :tcp, {}], + # 'server_create_connection tls' => [:server_create, :tls, {}], + # 'server_create_connection unix' => [:server_create, :unix, {}], + ) + test 'cannot create 2 or more servers using same bind address and port if shared option is false' do |(m, proto, kwargs)| + begin + d2 = Dummy.new; d2.start; d2.after_start + + assert_nothing_raised do + @d.__send__(m, :myserver, PORT, proto: proto, shared: false, **kwargs){|x| x } + end + assert_raise(Errno::EADDRINUSE) do + d2.__send__(m, :myserver, PORT, proto: proto, **kwargs){|x| x } + end + ensure + d2.stop; d2.before_shutdown; d2.shutdown; d2.after_shutdown; d2.close; d2.terminate + end + end + end + + sub_test_case '#server_create' do + data( + 'tcp' => [:tcp, {}], + 'udp' => [:udp, {max_bytes: 128}], + # 'tls' => [:tls, {}], + # 'unix' => [:unix, {}], + ) + test 'raise error if block argument is not specified or too many' do |(proto, kwargs)| + assert_raise(ArgumentError.new("BUG: block must have 1 or 2 arguments")) do + @d.server_create(:myserver, PORT, proto: proto, **kwargs){ 1 } + end + assert_raise(ArgumentError.new("BUG: block must have 1 or 2 arguments")) do + @d.server_create(:myserver, PORT, proto: proto, **kwargs){|sock, conn, what_is_this| 1 } + end + end + + test 'creates udp server if specified in proto' do + @d.server_create(:myserver, PORT, proto: :udp, max_bytes: 512){|x| x } + + assert_equal :udp, @d._servers.first.proto + assert{ @d._servers.first.server.is_a? Fluent::PluginHelper::Server::EventHandler::UDPServer } + end + end + + sub_test_case '#server_create_tcp' do + test 'can accept all keyword arguments valid for tcp server' do + assert_nothing_raised do + @d.server_create_tcp(:s, PORT, bind: '127.0.0.1', shared: false, resolve_name: true, linger_timeout: 10, backlog: 500) do |data, conn| + # ... + end + end + end + + test 'creates a tcp server just to read data' do + received = "" + @d.server_create_tcp(:s, PORT) do |data| + received << data + end + 3.times do + sock = TCPSocket.new("127.0.0.1", PORT) + sock.puts "yay" + sock.puts "foo" + sock.close + end + waiting(10){ sleep 0.1 until received.bytesize == 24 } + assert_equal "yay\nfoo\nyay\nfoo\nyay\nfoo\n", received + end + + test 'creates a tcp server to read and write data' do + received = "" + responses = [] + @d.server_create_tcp(:s, PORT) do |data, conn| + received << data + conn.write "ack\n" + end + 3.times do + TCPSocket.open("127.0.0.1", PORT) do |sock| + sock.puts "yay" + sock.puts "foo" + responses << sock.readline + end + end + waiting(10){ sleep 0.1 until received.bytesize == 24 } + assert_equal "yay\nfoo\nyay\nfoo\nyay\nfoo\n", received + assert_equal ["ack\n","ack\n","ack\n"], responses + end + + test 'creates a tcp server to read and write data using IPv6' do + omit "IPv6 unavailable here" unless ipv6_enabled? + + received = "" + responses = [] + @d.server_create_tcp(:s, PORT, bind: "::1") do |data, conn| + received << data + conn.write "ack\n" + end + 3.times do + TCPSocket.open("::1", PORT) do |sock| + sock.puts "yay" + sock.puts "foo" + responses << sock.readline + end + end + waiting(10){ sleep 0.1 until received.bytesize == 24 } + assert_equal "yay\nfoo\nyay\nfoo\nyay\nfoo\n", received + assert_equal ["ack\n","ack\n","ack\n"], responses + end + + test 'does not resolve name of client address in default' do + received = "" + sources = [] + @d.server_create_tcp(:s, PORT) do |data, conn| + received << data + sources << conn.remote_host + end + 3.times do + TCPSocket.open("127.0.0.1", PORT) do |sock| + sock.puts "yay" + end + end + waiting(10){ sleep 0.1 until received.bytesize == 12 } + assert_equal "yay\nyay\nyay\n", received + assert{ sources.all?{|s| s == "127.0.0.1" } } + end + + test 'does resolve name of client address if resolve_name is true' do + received = "" + sources = [] + @d.server_create_tcp(:s, PORT, resolve_name: true) do |data, conn| + received << data + sources << conn.remote_host + end + 3.times do + TCPSocket.open("127.0.0.1", PORT) do |sock| + sock.puts "yay" + end + end + waiting(10){ sleep 0.1 until received.bytesize == 12 } + assert_equal "yay\nyay\nyay\n", received + assert{ sources.all?{|s| s != "127.0.0.1" && Socket.getaddrinfo(s, PORT, Socket::AF_INET).any?{|i| i[3] == "127.0.0.1"} } } + end + + test 'can keep connections alive for tcp if keepalive specified' do + # pend "not implemented yet" + end + + test 'raises error if plugin registers data callback for connection object from #server_create' do + received = "" + errors = [] + @d.server_create_tcp(:s, PORT) do |data, conn| + received << data + begin + conn.data{|d| received << d.upcase } + rescue => e + errors << e + end + end + TCPSocket.open("127.0.0.1", PORT) do |sock| + sock.puts "foo" + end + waiting(10){ sleep 0.1 until received.bytesize == 4 || errors.size == 1 } + assert_equal "foo\n", received + assert_equal 1, errors.size + assert_equal "data callback can be registered just once, but registered twice", errors.first.message + end + + test 'can call write_complete callback if registered' do + buffer = "" + lines = [] + responses = [] + response_completes = [] + @d.server_create_tcp(:s, PORT) do |data, conn| + conn.on(:write_complete){|c| response_completes << true } + buffer << data + if idx = buffer.index("\n") + lines << buffer.slice!(0,idx+1) + conn.write "ack\n" + end + end + 3.times do + TCPSocket.open("127.0.0.1", PORT) do |sock| + sock.write "yay" + sock.write "foo\n" + begin + responses << sock.readline + rescue EOFError, IOError, Errno::ECONNRESET + # ignore + end + sock.close + end + end + waiting(10){ sleep 0.1 until lines.size == 3 && response_completes.size == 3 } + assert_equal ["yayfoo\n", "yayfoo\n", "yayfoo\n"], lines + assert_equal ["ack\n","ack\n","ack\n"], responses + assert_equal [true, true, true], response_completes + end + + test 'can call close callback if registered' do + buffer = "" + lines = [] + callback_results = [] + @d.server_create_tcp(:s, PORT) do |data, conn| + conn.on(:close){|c| callback_results << "closed" } + buffer << data + if idx = buffer.index("\n") + lines << buffer.slice!(0,idx+1) + conn.write "ack\n" + end + end + 3.times do + TCPSocket.open("127.0.0.1", PORT) do |sock| + sock.write "yay" + sock.write "foo\n" + begin + while line = sock.readline + if line == "ack\n" + sock.close + end + end + rescue EOFError, IOError, Errno::ECONNRESET + # ignore + end + end + end + waiting(10){ sleep 0.1 until lines.size == 3 && callback_results.size == 3 } + assert_equal ["yayfoo\n", "yayfoo\n", "yayfoo\n"], lines + assert_equal ["closed", "closed", "closed"], callback_results + end + end + + sub_test_case '#server_create_udp' do + test 'can accept all keyword arguments valid for udp server' do + assert_nothing_raised do + @d.server_create_udp(:s, PORT, bind: '127.0.0.1', shared: false, resolve_name: true, max_bytes: 100, flags: 1) do |data, conn| + # ... + end + end + end + + test 'creates a udp server just to read data' do + received = "" + @d.server_create_udp(:s, PORT, max_bytes: 128) do |data| + received << data + end + bind_port = unused_port + 3.times do + sock = UDPSocket.new(Socket::AF_INET) + sock.bind("127.0.0.1", bind_port) + sock.connect("127.0.0.1", PORT) + sock.puts "yay" + sock.puts "foo" + sock.close + end + waiting(10){ sleep 0.1 until received.bytesize == 24 } + assert_equal "yay\nfoo\nyay\nfoo\nyay\nfoo\n", received + end + + test 'creates a udp server to read and write data' do + received = "" + responses = [] + @d.server_create_udp(:s, PORT, max_bytes: 128) do |data, sock| + received << data + sock.write "ack\n" + end + bind_port = unused_port + 3.times do + begin + sock = UDPSocket.new(Socket::AF_INET) + sock.bind("127.0.0.1", bind_port) + sock.connect("127.0.0.1", PORT) + th = Thread.new do + while true + begin + in_data, _addr = sock.recvfrom_nonblock(16) + if in_data + responses << in_data + break + end + rescue IO::WaitReadable + IO.select([sock]) + end + end + true + end + sock.write "yay\nfoo\n" + th.join(5) + ensure + sock.close + end + end + waiting(10){ sleep 0.1 until received.bytesize == 24 } + assert_equal "yay\nfoo\nyay\nfoo\nyay\nfoo\n", received + assert_equal ["ack\n","ack\n","ack\n"], responses + end + + test 'creates a udp server to read and write data using IPv6' do + omit "IPv6 unavailable here" unless ipv6_enabled? + + received = "" + responses = [] + @d.server_create_udp(:s, PORT, bind: "::1", max_bytes: 128) do |data, sock| + received << data + sock.write "ack\n" + end + bind_port = unused_port + 3.times do + begin + sock = UDPSocket.new(Socket::AF_INET6) + sock.bind("::1", bind_port) + th = Thread.new do + responses << sock.recv(16) + true + end + sock.connect("::1", PORT) + sock.write "yay\nfoo\n" + th.join(5) + ensure + sock.close + end + end + waiting(10){ sleep 0.1 until received.bytesize == 24 } + assert_equal "yay\nfoo\nyay\nfoo\nyay\nfoo\n", received + assert_equal ["ack\n","ack\n","ack\n"], responses + end + + test 'does not resolve name of client address in default' do + received = "" + sources = [] + @d.server_create_udp(:s, PORT, max_bytes: 128) do |data, sock| + received << data + sources << sock.remote_host + end + 3.times do + sock = UDPSocket.new(Socket::AF_INET) + sock.connect("127.0.0.1", PORT) + sock.puts "yay" + sock.close + end + waiting(10){ sleep 0.1 until received.bytesize == 12 } + assert_equal "yay\nyay\nyay\n", received + assert{ sources.all?{|s| s == "127.0.0.1" } } + end + + test 'does resolve name of client address if resolve_name is true' do + received = "" + sources = [] + @d.server_create_udp(:s, PORT, resolve_name: true, max_bytes: 128) do |data, sock| + received << data + sources << sock.remote_host + end + 3.times do + sock = UDPSocket.new(Socket::AF_INET) + sock.connect("127.0.0.1", PORT) + sock.puts "yay" + sock.close + end + waiting(10){ sleep 0.1 until received.bytesize == 12 } + assert_equal "yay\nyay\nyay\n", received + assert{ sources.all?{|s| s != "127.0.0.1" && Socket.getaddrinfo(s, PORT, Socket::AF_INET).any?{|i| i[3] == "127.0.0.1"} } } + end + + test 'raises error if plugin registers data callback for connection object from #server_create' do + received = "" + errors = [] + @d.server_create_udp(:s, PORT, max_bytes: 128) do |data, sock| + received << data + begin + sock.data{|d| received << d.upcase } + rescue => e + errors << e + end + end + sock = UDPSocket.new(Socket::AF_INET) + sock.connect("127.0.0.1", PORT) + sock.write "foo\n" + sock.close + + waiting(10){ sleep 0.1 until received.bytesize == 4 || errors.size == 1 } + assert_equal "foo\n", received + assert_equal 1, errors.size + assert_equal "BUG: this event is disabled for udp: data", errors.first.message + end + + test 'raise error if plugin registers write_complete callback for udp' do + received = "" + errors = [] + @d.server_create_udp(:s, PORT, max_bytes: 128) do |data, sock| + received << data + begin + sock.on(:write_complete){|conn| "" } + rescue => e + errors << e + end + end + sock = UDPSocket.new(Socket::AF_INET) + sock.connect("127.0.0.1", PORT) + sock.write "foo\n" + sock.close + + waiting(10){ sleep 0.1 until received.bytesize == 4 || errors.size == 1 } + assert_equal "foo\n", received + assert_equal 1, errors.size + assert_equal "BUG: this event is disabled for udp: write_complete", errors.first.message + end + + test 'raises error if plugin registers close callback for udp' do + received = "" + errors = [] + @d.server_create_udp(:s, PORT, max_bytes: 128) do |data, sock| + received << data + begin + sock.on(:close){|d| "" } + rescue => e + errors << e + end + end + sock = UDPSocket.new(Socket::AF_INET) + sock.connect("127.0.0.1", PORT) + sock.write "foo\n" + sock.close + + waiting(10){ sleep 0.1 until received.bytesize == 4 || errors.size == 1 } + assert_equal "foo\n", received + assert_equal 1, errors.size + assert_equal "BUG: this event is disabled for udp: close", errors.first.message + end + end + + sub_test_case '#server_create_tls' do + # not implemented yet + + # test 'can accept all keyword arguments valid for tcp/tls server' + # test 'creates a tls server just to read data' + # test 'creates a tls server to read and write data' + # test 'creates a tls server to read and write data using IPv6' + + # many tests about certops + + # test 'does not resolve name of client address in default' + # test 'does resolve name of client address if resolve_name is true' + # test 'can keep connections alive for tls if keepalive specified' do + # pend "not implemented yet" + # end + + # test 'raises error if plugin registers data callback for connection object from #server_create' + # test 'can call write_complete callback if registered' + # test 'can call close callback if registered' + end + + sub_test_case '#server_create_unix' do + # not implemented yet + + # test 'can accept all keyword arguments valid for unix server' + # test 'creates a unix server just to read data' + # test 'creates a unix server to read and write data' + + # test 'raises error if plugin registers data callback for connection object from #server_create' + # test 'can call write_complete callback if registered' + # test 'can call close callback if registered' + end + + # run tests for tcp, tls and unix + sub_test_case '#server_create_connection' do + test 'raise error if udp is specified in proto' do + assert_raise(ArgumentError.new("BUG: cannot create connection for UDP")) do + @d.server_create_connection(:myserver, PORT, proto: :udp){|c| c } + end + end + + # def server_create_connection(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, certopts: nil, resolve_name: false, linger_timeout: 0, backlog: nil, &block) + protocols = { + 'tcp' => [:tcp, {}], + # 'tls' => [:tls, {certopts: {}}], + # 'unix' => [:unix, {path: ""}], + } + + data(protocols) + test 'raise error if block argument is not specified or too many' do |(proto, kwargs)| + empty_block = ->(){} + assert_raise(ArgumentError.new("BUG: block must have just one argument")) do + @d.server_create_connection(:myserver, PORT, proto: proto, **kwargs, &empty_block) + end + assert_raise(ArgumentError.new("BUG: block must have just one argument")) do + @d.server_create_connection(:myserver, PORT, proto: proto, **kwargs){|conn, what_is_this| [conn, what_is_this] } + end + end + + data(protocols) + test 'does not resolve name of client address in default' do |(proto, kwargs)| + received = "" + sources = [] + @d.server_create_connection(:s, PORT, proto: proto, **kwargs) do |conn| + sources << conn.remote_host + conn.data do |d| + received << d + end + end + 3.times do + TCPSocket.open("127.0.0.1", PORT) do |sock| + sock.puts "yay" + end + end + waiting(10){ sleep 0.1 until received.bytesize == 12 } + assert_equal "yay\nyay\nyay\n", received + assert{ sources.all?{|s| s == "127.0.0.1" } } + end + + data(protocols) + test 'does resolve name of client address if resolve_name is true' do |(proto, kwargs)| + received = "" + sources = [] + @d.server_create_connection(:s, PORT, proto: proto, **kwargs) do |conn| + sources << conn.remote_host + conn.data do |d| + received << d + end + end + 3.times do + TCPSocket.open("127.0.0.1", PORT) do |sock| + sock.puts "yay" + end + end + waiting(10){ sleep 0.1 until received.bytesize == 12 } + assert_equal "yay\nyay\nyay\n", received + assert{ sources.all?{|s| s == "127.0.0.1" } } + end + + data(protocols) + test 'creates a server to provide connection, which can read, write and close' do |(proto, kwargs)| + lines = [] + buffer = "" + @d.server_create_connection(:s, PORT, proto: proto, **kwargs) do |conn| + conn.data do |d| + buffer << d + if buffer == "x" + buffer.slice!(0, 1) + conn.close + end + if idx = buffer.index("\n") + lines << buffer.slice!(0, idx + 1) + conn.write "foo!\n" + end + end + end + replied = [] + disconnecteds = [] + 3.times do + TCPSocket.open("127.0.0.1", PORT) do |sock| + sock.puts "yay" + while line = sock.readline + replied << line + break + end + sock.write "x" + begin + sock.read + rescue => e + if e.is_a?(Errno::ECONNRESET) + disconnecteds << e.class + end + end + end + end + waiting(10){ sleep 0.1 until lines.size == 3 } + waiting(10){ sleep 0.1 until replied.size == 3 } + waiting(10){ sleep 0.1 until disconnecteds.size == 3 } + assert_equal ["yay\n", "yay\n", "yay\n"], lines + assert_equal ["foo!\n", "foo!\n", "foo!\n"], replied + assert_equal [Errno::ECONNRESET, Errno::ECONNRESET, Errno::ECONNRESET], disconnecteds + end + + data(protocols) + test 'creates a server to provide connection, which accepts callbacks for data, write_complete, and close' do |(proto, kwargs)| + lines = [] + buffer = "" + written = 0 + closed = 0 + @d.server_create_connection(:s, PORT, proto: proto, **kwargs) do |conn| + conn.on(:write_complete){|_conn| written += 1 } + conn.on(:close){|_conn| closed += 1 } + conn.on(:data) do |d| + buffer << d + if idx = buffer.index("\n") + lines << buffer.slice!(0, idx + 1) + conn.write "foo!\n" + end + end + end + replied = [] + 3.times do + TCPSocket.open("127.0.0.1", PORT) do |sock| + sock.puts "yay" + while line = sock.readline + replied << line + break + end + end # TCP socket is closed here + end + waiting(10){ sleep 0.1 until lines.size == 3 } + waiting(10){ sleep 0.1 until replied.size == 3 } + waiting(10){ sleep 0.1 until closed == 3 } + assert_equal ["yay\n", "yay\n", "yay\n"], lines + assert_equal 3, written + assert_equal 3, closed + assert_equal ["foo!\n", "foo!\n", "foo!\n"], replied + end + + data(protocols) + test 'creates a server, and does not leak connections' do |(proto, kwargs)| + buffer = "" + closed = 0 + @d.server_create_connection(:s, PORT, proto: proto, **kwargs) do |conn| + conn.on(:close){|_c| closed += 1 } + conn.on(:data) do |d| + buffer << d + end + end + 3.times do + TCPSocket.open("127.0.0.1", PORT) do |sock| + sock.puts "yay" + end + end + waiting(10){ sleep 0.1 until buffer.bytesize == 12 } + waiting(10){ sleep 0.1 until closed == 3 } + assert_equal 0, @d.instance_eval{ @_server_connections.size } + end + + test 'can keep connections alive for tcp/tls if keepalive specified' do + # pend "not implemented yet" + end + end + +end From 93e5c1e8c786723787b0fb1f3d81513e00bf6f1e Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Tue, 22 Nov 2016 10:54:18 +0900 Subject: [PATCH 17/29] show tests which be stuck --- .travis.yml | 2 ++ appveyor.yml | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 90f7d13125..4fc3954ac2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,8 @@ language: ruby cache: bundler +script: bundle exec rake test TESTOPTS=-v + # http://rubies.travis-ci.org/ # See here for osx_image -> OSX versions: https://docs.travis-ci.com/user/languages/objective-c matrix: diff --git a/appveyor.yml b/appveyor.yml index 8818afc999..70b990a305 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -11,7 +11,7 @@ install: - bundle install build: off test_script: - - bundle exec rake test + - bundle exec rake test TESTOPTS=-v branches: only: From 3493bb91972b9a898861190a02cfd1fc7b7406d7 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Tue, 22 Nov 2016 15:48:31 +0900 Subject: [PATCH 18/29] update serverengine dependency to fix IPv6 support on Windows --- fluentd.gemspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fluentd.gemspec b/fluentd.gemspec index 95ee52095c..bdafa4d415 100644 --- a/fluentd.gemspec +++ b/fluentd.gemspec @@ -22,7 +22,7 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency("msgpack", [">= 0.7.0", "< 2.0.0"]) gem.add_runtime_dependency("yajl-ruby", ["~> 1.0"]) gem.add_runtime_dependency("cool.io", ["~> 1.4.5"]) - gem.add_runtime_dependency("serverengine", ["~> 2.0"]) + gem.add_runtime_dependency("serverengine", [">= 2.0.3", "< 3.0.0"]) gem.add_runtime_dependency("http_parser.rb", [">= 0.5.1", "< 0.7.0"]) gem.add_runtime_dependency("sigdump", ["~> 0.2.2"]) gem.add_runtime_dependency("tzinfo", ["~> 1.0"]) From c12758ee694f4fbeadb4f779d116807fc86d3ac4 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Tue, 22 Nov 2016 16:34:41 +0900 Subject: [PATCH 19/29] fix not to use power_assert to avoid SystemStackError on this assertion --- test/plugin_helper/test_server.rb | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/test/plugin_helper/test_server.rb b/test/plugin_helper/test_server.rb index a4bb8a87bc..e9c84cc251 100644 --- a/test/plugin_helper/test_server.rb +++ b/test/plugin_helper/test_server.rb @@ -116,22 +116,29 @@ class Dummy < Fluent::Plugin::TestBase @d.__send__(m, :myserver, PORT){|x| x } assert_equal 1, @d._servers.size - assert_equal :myserver, @d._servers.first.title - assert_equal PORT, @d._servers.first.port - assert_equal :tcp, @d._servers.first.proto - assert_equal "0.0.0.0", @d._servers.first.bind + created_server_info = @d._servers.first - assert{ @d._servers.first.server.is_a? Coolio::TCPServer } - assert_equal "0.0.0.0", @d._servers.first.server.instance_eval{ @listen_socket }.addr[3] + assert_equal :myserver, created_server_info.title + assert_equal PORT, created_server_info.port + + assert_equal :tcp, created_server_info.proto + assert_equal "0.0.0.0", created_server_info.bind + + created_server = created_server_info.server + + assert created_server.is_a?(Coolio::TCPServer) + assert_equal "0.0.0.0", created_server.instance_eval{ @listen_socket }.addr[3] end data(methods) test 'creates tcp server if specified in proto' do |m| @d.__send__(m, :myserver, PORT, proto: :tcp){|x| x } - assert_equal :tcp, @d._servers.first.proto - assert{ @d._servers.first.server.is_a? Coolio::TCPServer } + created_server_info = @d._servers.first + assert_equal :tcp, created_server_info.proto + created_server = created_server_info.server + assert created_server.is_a?(Coolio::TCPServer) end # tests about "proto: :udp" is in #server_create @@ -294,8 +301,10 @@ class Dummy < Fluent::Plugin::TestBase test 'creates udp server if specified in proto' do @d.server_create(:myserver, PORT, proto: :udp, max_bytes: 512){|x| x } - assert_equal :udp, @d._servers.first.proto - assert{ @d._servers.first.server.is_a? Fluent::PluginHelper::Server::EventHandler::UDPServer } + created_server_info = @d._servers.first + assert_equal :udp, created_server_info.proto + created_server = created_server_info.server + assert created_server.is_a?(Fluent::PluginHelper::Server::EventHandler::UDPServer) end end From f29de3e1793923447da5b5bf52c95024b76b2320 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Tue, 22 Nov 2016 16:35:34 +0900 Subject: [PATCH 20/29] fix to assert hostname directly for environments which does not have hostname only for 127.0.0.1 --- test/plugin_helper/test_server.rb | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/test/plugin_helper/test_server.rb b/test/plugin_helper/test_server.rb index e9c84cc251..d38ae38a3e 100644 --- a/test/plugin_helper/test_server.rb +++ b/test/plugin_helper/test_server.rb @@ -390,6 +390,8 @@ class Dummy < Fluent::Plugin::TestBase end test 'does resolve name of client address if resolve_name is true' do + hostname = Socket.getnameinfo([nil, nil, nil, "127.0.0.1"])[0] + received = "" sources = [] @d.server_create_tcp(:s, PORT, resolve_name: true) do |data, conn| @@ -403,7 +405,7 @@ class Dummy < Fluent::Plugin::TestBase end waiting(10){ sleep 0.1 until received.bytesize == 12 } assert_equal "yay\nyay\nyay\n", received - assert{ sources.all?{|s| s != "127.0.0.1" && Socket.getaddrinfo(s, PORT, Socket::AF_INET).any?{|i| i[3] == "127.0.0.1"} } } + assert{ sources.all?{|s| s == hostname } } end test 'can keep connections alive for tcp if keepalive specified' do @@ -608,6 +610,8 @@ class Dummy < Fluent::Plugin::TestBase end test 'does resolve name of client address if resolve_name is true' do + hostname = Socket.getnameinfo([nil, nil, nil, "127.0.0.1"])[0] + received = "" sources = [] @d.server_create_udp(:s, PORT, resolve_name: true, max_bytes: 128) do |data, sock| @@ -622,7 +626,7 @@ class Dummy < Fluent::Plugin::TestBase end waiting(10){ sleep 0.1 until received.bytesize == 12 } assert_equal "yay\nyay\nyay\n", received - assert{ sources.all?{|s| s != "127.0.0.1" && Socket.getaddrinfo(s, PORT, Socket::AF_INET).any?{|i| i[3] == "127.0.0.1"} } } + assert{ sources.all?{|s| s == hostname } } end test 'raises error if plugin registers data callback for connection object from #server_create' do @@ -773,9 +777,11 @@ class Dummy < Fluent::Plugin::TestBase data(protocols) test 'does resolve name of client address if resolve_name is true' do |(proto, kwargs)| + hostname = Socket.getnameinfo([nil, nil, nil, "127.0.0.1"])[0] + received = "" sources = [] - @d.server_create_connection(:s, PORT, proto: proto, **kwargs) do |conn| + @d.server_create_connection(:s, PORT, proto: proto, resolve_name: true, **kwargs) do |conn| sources << conn.remote_host conn.data do |d| received << d @@ -788,7 +794,7 @@ class Dummy < Fluent::Plugin::TestBase end waiting(10){ sleep 0.1 until received.bytesize == 12 } assert_equal "yay\nyay\nyay\n", received - assert{ sources.all?{|s| s == "127.0.0.1" } } + assert{ sources.all?{|s| s == hostname } } end data(protocols) From 5a97f75b887f0a1c50bc5093300b38bd61489866 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Thu, 24 Nov 2016 10:58:41 +0900 Subject: [PATCH 21/29] update ServerEngine not to crash for IPv6 & Windows --- fluentd.gemspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fluentd.gemspec b/fluentd.gemspec index bdafa4d415..730fca6f5a 100644 --- a/fluentd.gemspec +++ b/fluentd.gemspec @@ -22,7 +22,7 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency("msgpack", [">= 0.7.0", "< 2.0.0"]) gem.add_runtime_dependency("yajl-ruby", ["~> 1.0"]) gem.add_runtime_dependency("cool.io", ["~> 1.4.5"]) - gem.add_runtime_dependency("serverengine", [">= 2.0.3", "< 3.0.0"]) + gem.add_runtime_dependency("serverengine", [">= 2.0.4", "< 3.0.0"]) gem.add_runtime_dependency("http_parser.rb", [">= 0.5.1", "< 0.7.0"]) gem.add_runtime_dependency("sigdump", ["~> 0.2.2"]) gem.add_runtime_dependency("tzinfo", ["~> 1.0"]) From f1f5dc5b85144cea10631bceb50c245a356dced8 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Thu, 24 Nov 2016 11:13:55 +0900 Subject: [PATCH 22/29] refer both of source_hostname_key and source_host_key (as optional parameter) --- lib/fluent/plugin/in_tcp.rb | 4 +++- lib/fluent/plugin/in_udp.rb | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin/in_tcp.rb b/lib/fluent/plugin/in_tcp.rb index 37dc5a10ba..8fdc4cf7d2 100644 --- a/lib/fluent/plugin/in_tcp.rb +++ b/lib/fluent/plugin/in_tcp.rb @@ -43,6 +43,8 @@ def configure(conf) compat_parameters_convert(conf, :parser) super @_event_loop_blocking_timeout = @blocking_timeout + @source_hostname_key ||= @source_host_key if @source_host_key + @parser = parser_create end @@ -67,7 +69,7 @@ def start tag = extract_tag_from_record(record) tag ||= @tag time ||= extract_time_from_record(record) || Fluent::EventTime.now - record[@source_host_key] = conn.remote_host if @source_host_key + record[@source_hostname_key] = conn.remote_host if @source_hostname_key router.emit(tag, time, record) end end diff --git a/lib/fluent/plugin/in_udp.rb b/lib/fluent/plugin/in_udp.rb index d4718a8c1d..21f8cb2441 100644 --- a/lib/fluent/plugin/in_udp.rb +++ b/lib/fluent/plugin/in_udp.rb @@ -42,6 +42,8 @@ def configure(conf) compat_parameters_convert(conf, :parser) super @_event_loop_blocking_timeout = @blocking_timeout + @source_hostname_key ||= @source_host_key if @source_host_key + @parser = parser_create end @@ -61,7 +63,7 @@ def start tag = extract_tag_from_record(record) tag ||= @tag time ||= extract_time_from_record(record) || Fluent::EventTime.now - record[@source_host_key] = sock.remote_host if @source_host_key + record[@source_hostname_key] = sock.remote_host if @source_hostname_key router.emit(tag, time, record) end end From 89ebb4685dc94d17b8ea2cf6087e488ec94a021f Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Thu, 24 Nov 2016 11:14:21 +0900 Subject: [PATCH 23/29] simplify code and inspect objects for errors --- lib/fluent/plugin_helper/server.rb | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/lib/fluent/plugin_helper/server.rb b/lib/fluent/plugin_helper/server.rb index ae82bf4530..8156f77ad4 100644 --- a/lib/fluent/plugin_helper/server.rb +++ b/lib/fluent/plugin_helper/server.rb @@ -311,9 +311,7 @@ def on(event, &callback) when :data @sock.data(&callback) when :write_complete - cb = ->(){ - callback.call(self) - } + cb = ->(){ callback.call(self) } @sock.on_write_complete(&cb) when :close cb = ->(){ callback.call(self) } @@ -413,7 +411,7 @@ class TCPServer < Coolio::TCPSocket SOCK_OPT_FORMAT = 'I!I!' # { int l_onoff; int l_linger; } def initialize(sock, close_callback, resolve_name, linger_timeout, log, under_plugin_development, connect_callback) - raise ArgumentError, "socket must be a TCPSocket" unless sock.is_a?(TCPSocket) + raise ArgumentError, "socket must be a TCPSocket: sock=#{sock}" unless sock.is_a?(TCPSocket) sock_opt = [1, linger_timeout].pack(SOCK_OPT_FORMAT) sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_LINGER, sock_opt) From 64973190689171ba24a21994436e5a7212f67a1b Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Thu, 24 Nov 2016 11:54:13 +0900 Subject: [PATCH 24/29] add helper method to get unused UDP ports --- test/helper.rb | 37 ++++++++++++++++++++++++++++++- test/plugin_helper/test_server.rb | 2 +- 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/test/helper.rb b/test/helper.rb index 16ef30f348..592bc423a6 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -69,7 +69,18 @@ class Test::Unit::AssertionFailedError < StandardError include Fluent::Test::Helpers -def unused_port(num = 1) +def unused_port(num = 1, protocol: :tcp, bind: "0.0.0.0") + case protocol + when :tcp + unused_port_tcp(num) + when :udp + unused_port_udp(num, bind: bind) + else + raise ArgumentError, "unknown protocol: #{protocol}" + end +end + +def unused_port_tcp(num = 1) ports = [] sockets = [] num.times do @@ -85,6 +96,30 @@ def unused_port(num = 1) end end +PORT_RANGE_AVAILABLE = (1024...65535) + +def unused_port_udp(num = 1, bind: "0.0.0.0") + family = IPAddr.new(IPSocket.getaddress(bind)).ipv4? ? ::Socket::AF_INET : ::Socket::AF_INET6 + ports = [] + sockets = [] + while ports.size < num + port = rand(PORT_RANGE_AVAILABLE) + u = UDPSocket.new(family) + if (u.bind(bind, port) rescue nil) + ports << port + sockets << u + else + u.close + end + end + sockets.each{|s| s.close } + if num == 1 + return ports.first + else + return *ports + end +end + def waiting(seconds, logs: nil, plugin: nil) begin Timeout.timeout(seconds) do diff --git a/test/plugin_helper/test_server.rb b/test/plugin_helper/test_server.rb index d38ae38a3e..039e2b7deb 100644 --- a/test/plugin_helper/test_server.rb +++ b/test/plugin_helper/test_server.rb @@ -510,7 +510,7 @@ class Dummy < Fluent::Plugin::TestBase @d.server_create_udp(:s, PORT, max_bytes: 128) do |data| received << data end - bind_port = unused_port + bind_port = unused_port(protocol: :udp, bind: "127.0.0.1") 3.times do sock = UDPSocket.new(Socket::AF_INET) sock.bind("127.0.0.1", bind_port) From 742b79ff9704abb76b878ce9d9c35fa0a25cec8b Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Thu, 24 Nov 2016 11:55:14 +0900 Subject: [PATCH 25/29] revert verbose testing option for CI services --- .travis.yml | 2 +- appveyor.yml | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 4fc3954ac2..418341a535 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,7 @@ language: ruby cache: bundler -script: bundle exec rake test TESTOPTS=-v +# script: bundle exec rake test TESTOPTS=-v # http://rubies.travis-ci.org/ # See here for osx_image -> OSX versions: https://docs.travis-ci.com/user/languages/objective-c diff --git a/appveyor.yml b/appveyor.yml index 70b990a305..4f72f5f148 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -11,7 +11,8 @@ install: - bundle install build: off test_script: - - bundle exec rake test TESTOPTS=-v + - bundle exec rake test +# - bundle exec rake test TESTOPTS=-v branches: only: From cf763b9efd96807f396ad35b86c31b8aa47c8379 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Thu, 24 Nov 2016 11:58:14 +0900 Subject: [PATCH 26/29] removed debugging message --- lib/fluent/plugin/in_forward.rb | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb index 4cc3323369..bb40bc9b9e 100644 --- a/lib/fluent/plugin/in_forward.rb +++ b/lib/fluent/plugin/in_forward.rb @@ -322,8 +322,6 @@ def on_message(msg, chunk_size, remote_host) # return option for response option - ensure - p(here: "ensure of on_message", error: $!) if $! end def invalid_event?(tag, time, record) From a29481f02b27eae4fdbf5261cdb60f75c3b79edc Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Fri, 25 Nov 2016 11:57:54 +0900 Subject: [PATCH 27/29] separate setting socket options to share it with socket plugin helper --- lib/fluent/plugin_helper/server.rb | 57 ++++++++-------- lib/fluent/plugin_helper/socket_option.rb | 80 +++++++++++++++++++++++ 2 files changed, 106 insertions(+), 31 deletions(-) create mode 100644 lib/fluent/plugin_helper/socket_option.rb diff --git a/lib/fluent/plugin_helper/server.rb b/lib/fluent/plugin_helper/server.rb index 8156f77ad4..163b77ba90 100644 --- a/lib/fluent/plugin_helper/server.rb +++ b/lib/fluent/plugin_helper/server.rb @@ -22,14 +22,16 @@ require 'ipaddr' require 'fcntl' +require_relative 'socket_option' + module Fluent module PluginHelper module Server include Fluent::PluginHelper::EventLoop + include Fluent::PluginHelper::SocketOption # This plugin helper doesn't support these things for now: # * SSL/TLS (TBD) - # * IPv6 # * TCP/TLS keepalive # stop : [-] @@ -61,7 +63,7 @@ def server_wait_until_stop # conn.close # end # end - def server_create_connection(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, certopts: nil, resolve_name: false, linger_timeout: 0, backlog: nil, &block) + def server_create_connection(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, backlog: nil, **socket_options, &block) raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol) raise ArgumentError, "BUG: port must be an integer" unless port && port.is_a?(Integer) raise ArgumentError, "BUG: invalid protocol name" unless PROTOCOLS.include?(proto) @@ -70,16 +72,16 @@ def server_create_connection(title, port, proto: :tcp, bind: '0.0.0.0', shared: raise ArgumentError, "BUG: block not specified which handles connection" unless block_given? raise ArgumentError, "BUG: block must have just one argument" unless block.arity == 1 - if proto != :tls # TLS options - raise ArgumentError, "BUG: certopts is available only for tls" if certopts - end - if proto != :tcp && proto != :tls # TCP/TLS options - raise ArgumentError, "BUG: linger_timeout is available for tcp/tls" if linger_timeout != 0 + if proto == :tcp || proto == :tls # default linger_timeout only for server + socket_options[:linger_timeout] ||= 0 end + socket_option_validate!(proto, **socket_options) + socket_option_setter = ->(sock){ socket_option_set(sock, **socket_options) } + case proto when :tcp - server = server_create_for_tcp_connection(shared, bind, port, resolve_name, linger_timeout, backlog, &block) + server = server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, &block) when :tls raise ArgumentError, "BUG: certopts (certificate options) not specified for TLS" unless certopts # server_certopts_validate!(certopts) @@ -106,7 +108,7 @@ def server_create_connection(title, port, proto: :tcp, bind: '0.0.0.0', shared: # sock.remote_port # # ... # end - def server_create(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, certopts: nil, resolve_name: false, linger_timeout: 0, backlog: nil, max_bytes: nil, flags: 0, &callback) + def server_create(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, backlog: nil, max_bytes: nil, flags: 0, **socket_options, &callback) raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol) raise ArgumentError, "BUG: port must be an integer" unless port && port.is_a?(Integer) raise ArgumentError, "BUG: invalid protocol name" unless PROTOCOLS.include?(proto) @@ -114,12 +116,13 @@ def server_create(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, certo raise ArgumentError, "BUG: block not specified which handles received data" unless block_given? raise ArgumentError, "BUG: block must have 1 or 2 arguments" unless callback.arity == 1 || callback.arity == 2 - if proto != :tls # TLS options - raise ArgumentError, "BUG: certopts is available only for tls" if certopts - end - if proto != :tcp && proto != :tls # TCP/TLS options - raise ArgumentError, "BUG: linger_timeout is available for tcp/tls" if linger_timeout != 0 + if proto == :tcp || proto == :tls # default linger_timeout only for server + socket_options[:linger_timeout] ||= 0 end + + socket_option_validate!(proto, **socket_options) + socket_option_setter = ->(sock){ socket_option_set(sock, **socket_options) } + if proto != :tcp && proto != :tls && proto != :unix # options to listen/accept connections raise ArgumentError, "BUG: backlog is available for tcp/tls" if backlog end @@ -130,18 +133,16 @@ def server_create(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, certo case proto when :tcp - server = server_create_for_tcp_connection(shared, bind, port, resolve_name, linger_timeout, backlog) do |conn| + server = server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter) do |conn| conn.data(&callback) end when :tls - raise ArgumentError, "BUG: certopts (certificate options) not specified for TLS" unless certopts - server_certopts_validate!(certopts) raise "not implemented yet" when :udp raise ArgumentError, "BUG: max_bytes must be specified for UDP" unless max_bytes sock = server_create_udp_socket(shared, bind, port) - sock.do_not_reverse_lookup = !resolve_name - server = EventHandler::UDPServer.new(sock, resolve_name, max_bytes, flags, @log, @under_plugin_development, &callback) + socket_option_setter.call(sock) + server = EventHandler::UDPServer.new(sock, max_bytes, flags, @log, @under_plugin_development, &callback) when :unix raise "not implemented yet" else @@ -174,11 +175,11 @@ def server_attach(title, proto, port, bind, shared, server) event_loop_attach(server) end - def server_create_for_tcp_connection(shared, bind, port, resolve_name, linger_timeout, backlog, &block) + def server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, &block) sock = server_create_tcp_socket(shared, bind, port) - sock.do_not_reverse_lookup = !resolve_name + socket_option_setter.call(sock) close_callback = ->(conn){ @_server_mutex.synchronize{ @_server_connections.delete(conn) } } - server = Coolio::TCPServer.new(sock, nil, EventHandler::TCPServer, close_callback, resolve_name, linger_timeout, @log, @under_plugin_development, block) do |conn| + server = Coolio::TCPServer.new(sock, nil, EventHandler::TCPServer, socket_option_setter, close_callback, @log, @under_plugin_development, block) do |conn| @_server_mutex.synchronize do @_server_connections << conn end @@ -357,20 +358,18 @@ def write(data) module EventHandler class UDPServer < Coolio::IO - def initialize(sock, resolve_name, max_bytes, flags, log, under_plugin_development, &callback) + def initialize(sock, max_bytes, flags, log, under_plugin_development, &callback) raise ArgumentError, "socket must be a UDPSocket: sock = #{sock}" unless sock.is_a?(UDPSocket) super(sock) @sock = sock - @resolve_name = resolve_name @max_bytes = max_bytes @flags = flags @log = log @under_plugin_development = under_plugin_development @callback = callback - @sock.do_not_reverse_lookup = !resolve_name on_readable_impl = case @callback.arity when 1 then :on_readable_without_sock when 2 then :on_readable_with_sock @@ -408,14 +407,10 @@ def on_readable_with_sock end class TCPServer < Coolio::TCPSocket - SOCK_OPT_FORMAT = 'I!I!' # { int l_onoff; int l_linger; } - - def initialize(sock, close_callback, resolve_name, linger_timeout, log, under_plugin_development, connect_callback) + def initialize(sock, socket_option_setter, close_callback, log, under_plugin_development, connect_callback) raise ArgumentError, "socket must be a TCPSocket: sock=#{sock}" unless sock.is_a?(TCPSocket) - sock_opt = [1, linger_timeout].pack(SOCK_OPT_FORMAT) - sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_LINGER, sock_opt) - sock.do_not_reverse_lookup = !resolve_name + socket_option_setter.call(sock) @_handler_socket = sock super(sock) diff --git a/lib/fluent/plugin_helper/socket_option.rb b/lib/fluent/plugin_helper/socket_option.rb new file mode 100644 index 0000000000..7ef9a933f0 --- /dev/null +++ b/lib/fluent/plugin_helper/socket_option.rb @@ -0,0 +1,80 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'socket' + +# this module is only for Socket/Server plugin helpers +module Fluent + module PluginHelper + module SocketOption + FORMAT_STRUCT_LINGER = 'I!I!' # { int l_onoff; int l_linger; } + FORMAT_STRUCT_TIMEVAL = 'L!L!' # { time_t tv_sec; suseconds_t tv_usec; } + + def socket_option_validate!(protocol, resolve_name: nil, linger_timeout: nil, recv_timeout: nil, send_timeout: nil, certopts: nil) + unless resolve_name.nil? + if protocol != :tcp && protocol != :udp && protocol != :tls + raise ArgumentError, "BUG: resolve_name in available for tcp/udp/tls" + end + end + if linger_timeout + if protocol != :tcp && protocol != :tls + raise ArgumentError, "BUG: linger_timeout is available for tcp/tls" + end + end + if certopts + if protocol != :tls + raise ArgumentError, "BUG: certopts is available only for tls" + end + else + if protocol == :tls + raise ArgumentError, "BUG: certopts (certificate options) not specified for TLS" + socket_option_certopts_validate!(certopts) + end + end + end + + def socket_option_certopts_validate!(certopts) + raise "not implemented yet" + end + + def socket_option_set(sock, resolve_name: nil, linger_timeout: nil, recv_timeout: nil, send_timeout: nil, certopts: nil) + unless resolve_name.nil? + sock.do_not_reverse_lookup = !resolve_name + end + if linger_timeout + optval = [1, linger_timeout.to_i].pack(FORMAT_STRUCT_LINGER) + socket_option_set_one(sock, :SO_LINGER, optval) + end + if recv_timeout + optval = [recv_timeout.to_i, 0].pack(FORMAT_STRUCT_TIMEVAL) + socket_option_set_one(sock, :SO_RCVTIMEO, optval) + end + if send_timeout + optval = [send_timeout.to_i, 0].pack(FORMAT_STRUCT_TIMEVAL) + socket_option_set_one(sock, :SO_SNDTIMEO, optval) + end + # TODO: certopts for TLS + sock + end + + def socket_option_set_one(sock, option, value) + sock.setsockopt(Socket::SOL_SOCKET, option, value) + rescue => e + log.warn "failed to set socket option", sock: sock.class, option: option, value: value, error: e + end + end + end +end From ecc09590f47451c81f259de84a3046fc982e9450 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Fri, 25 Nov 2016 17:51:31 +0900 Subject: [PATCH 28/29] rescue Errno::ECONNRESET for cases ICMP has trouble (for getting hostname) --- lib/fluent/plugin/out_forward.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index 11db4dce9a..a8ee808fbc 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -317,7 +317,7 @@ def initialize(io, callback) def on_readable begin msg, addr = @io.recvfrom(1024) - rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR + rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR, Errno::ECONNRESET return end host = addr[3] From 4e63ef4cc487d35541c1823ac54d8e3d872cc0c5 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi <tagomoris@gmail.com> Date: Fri, 25 Nov 2016 17:52:15 +0900 Subject: [PATCH 29/29] require top level ServerEngine --- lib/fluent/plugin_helper/server.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin_helper/server.rb b/lib/fluent/plugin_helper/server.rb index 163b77ba90..b6d28bf2ef 100644 --- a/lib/fluent/plugin_helper/server.rb +++ b/lib/fluent/plugin_helper/server.rb @@ -16,7 +16,7 @@ require 'fluent/plugin_helper/event_loop' -require 'serverengine/socket_manager' +require 'serverengine' require 'cool.io' require 'socket' require 'ipaddr'