From a01883a2762042ac2efa6d82f697c2b3a1b7ee73 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Tue, 15 Nov 2016 17:28:26 +0900 Subject: [PATCH 01/29] make sure to mark as loop not running --- lib/fluent/plugin_helper/event_loop.rb | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/lib/fluent/plugin_helper/event_loop.rb b/lib/fluent/plugin_helper/event_loop.rb index eb45f5c5f9..909afdb294 100644 --- a/lib/fluent/plugin_helper/event_loop.rb +++ b/lib/fluent/plugin_helper/event_loop.rb @@ -65,11 +65,14 @@ def start # event loop does not run here, so mutex lock is not required thread_create :event_loop do - default_watcher = DefaultWatcher.new - @_event_loop.attach(default_watcher) - @_event_loop_running = true - @_event_loop.run(@_event_loop_run_timeout) # this method blocks - @_event_loop_running = false + begin + default_watcher = DefaultWatcher.new + @_event_loop.attach(default_watcher) + @_event_loop_running = true + @_event_loop.run(@_event_loop_run_timeout) # this method blocks + ensure + @_event_loop_running = false + end end end From 6874665d73f4258211d93269d96466356c248215 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 16 Nov 2016 18:23:42 +0900 Subject: [PATCH 02/29] add server plugin helper --- lib/fluent/plugin_helper.rb | 2 + lib/fluent/plugin_helper/server.rb | 411 +++++++++++++++++++++++++++++ lib/fluent/test/driver/base.rb | 20 ++ 3 files changed, 433 insertions(+) create mode 100644 lib/fluent/plugin_helper/server.rb diff --git a/lib/fluent/plugin_helper.rb b/lib/fluent/plugin_helper.rb index 8977f64913..d37519a9cb 100644 --- a/lib/fluent/plugin_helper.rb +++ b/lib/fluent/plugin_helper.rb @@ -24,6 +24,8 @@ require 'fluent/plugin_helper/formatter' require 'fluent/plugin_helper/inject' require 'fluent/plugin_helper/extract' +# require 'fluent/plugin_helper/socket' +require 'fluent/plugin_helper/server' require 'fluent/plugin_helper/retry_state' require 'fluent/plugin_helper/compat_parameters' diff --git a/lib/fluent/plugin_helper/server.rb b/lib/fluent/plugin_helper/server.rb new file mode 100644 index 0000000000..8b4660e1b6 --- /dev/null +++ b/lib/fluent/plugin_helper/server.rb @@ -0,0 +1,411 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin_helper/event_loop' + +require 'serverengine/socket_manager' +require 'cool.io' +require 'socket' +require 'ipaddr' +require 'fcntl' + +module Fluent + module PluginHelper + module Server + include Fluent::PluginHelper::EventLoop + + # This plugin helper doesn't support these things for now: + # * SSL/TLS (TBD) + # * IPv6 + # * TCP/TLS keepalive + + # stop : [-] + # shutdown : detach server event handler from event loop (event_loop) + # close : close listening sockets + # terminate: remote all server instances + + attr_reader :_servers # for tests + + def server_wait_until_start + # event_loop_wait_until_start works well for this + end + + def server_wait_until_stop + sleep 0.1 while @_servers.any?{|si| si.server.attached? } + @_servers.each{|si| si.server.close rescue nil } + end + + PROTOCOLS = [:tcp, :udp, :tls, :unix] + CONNECTION_PROTOCOLS = [:tcp, :tls, :unix] + + # server_create_connection(:title, @port) do |conn| + # # on connection + # source_addr = conn.remote_host + # source_port = conn.remote_port + # conn.data do |data| + # # on data + # conn.write resp # ... + # conn.close + # end + # end + def server_create_connection(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, certopts: nil, resolve_name: false, linger_timeout: 0, backlog: nil, &block) + raise ArgumentError, "BUG: title must be a symbol" unless title.is_a? Symbol + raise ArgumentError, "BUG: cannot create connection for UDP" unless CONNECTION_PROTOCOLS.include?(proto) + raise ArgumentError, "BUG: block not specified which handles connection" unless block_given? + raise ArgumentError, "BUG: block must have just one argument" unless block.arity == 1 + + case proto + when :tcp + server = server_create_for_tcp_connection(shared, bind, port, resolve_name, linger_timeout, backlog, &block) + when :tls + raise ArgumentError, "BUG: certopts (certificate options) not specified for TLS" unless certopts + # server_certopts_validate!(certopts) + # sock = server_create_tls_socket(shared, bind, port) + # server = nil # ... + raise "not implemented yet" + when :unix + raise "not implemented yet" + else + raise "unknown protocol #{proto}" + end + + server_attach(title, port, bind, shared, server) + end + + # server_create(:title, @port) do |data| + # # ... + # end + # server_create(:title, @port) do |data, conn| + # # ... + # end + # server_create(:title, @port, proto: :udp, max_bytes: 2048) do |data, sock| + # sock.remote_host + # sock.remote_port + # # ... + # end + def server_create(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, certopts: nil, resolve_name: false, linger_timeout: 0, backlog: nil, max_bytes: nil, flags: 0, &callback) + raise ArgumentError, "BUG: title must be a symbol" unless title.is_a? Symbol + raise ArgumentError, "BUG: invalid protocol name" unless PROTOCOLS.include?(proto) + + raise ArgumentError, "BUG: block not specified which handles received data" unless block_given? + raise ArgumentError, "BUG: block must have 1 or 2 arguments" unless callback.arity == 1 || callback.arity == 2 + + case proto + when :tcp + server = server_create_for_tcp_connection(shared, bind, port, resolve_name, linger_timeout, backlog) do |conn| + conn.data(&callback) + end + when :tls + raise ArgumentError, "BUG: certopts (certificate options) not specified for TLS" unless certopts + server_certopts_validate!(certopts) + raise "not implemented yet" + when :udp + raise ArgumentError, "BUG: max_bytes must be specified for UDP" unless max_bytes + sock = server_create_udp_socket(shared, bind, port) + sock.do_not_reverse_lookup = !resolve_name + server = EventHandler::UDPServer.new(sock, resolve_name, max_bytes, flags, @log, @under_plugin_development, &callback) + when :unix + raise "not implemented yet" + else + raise "BUG: unknown protocol #{proto}" + end + + server_attach(title, port, bind, shared, server) + end + + def server_create_tcp(title, port, **kwargs, &callback) + server_create(title, port, proto: :tcp, **kwargs, &callback) + end + + def server_create_udp(title, port, **kwargs, &callback) + server_create(title, port, proto: :udp, **kwargs, &callback) + end + + ServerInfo = Struct.new(:title, :port, :bind, :shared, :server) + + def server_attach(title, port, bind, shared, server) + @_servers << ServerInfo.new(title, port, bind, shared, server) + event_loop_attach(server) + end + + def server_create_for_tcp_connection(shared, bind, port, resolve_name, linger_timeout, backlog, &block) + sock = server_create_tcp_socket(shared, bind, port) + server = Coolio::TCPServer.new(sock, nil, EventHandler::TCPServer, resolve_name, linger_timeout, @log, @under_plugin_development, block) + server.listen(backlog) if backlog + server + end + + def initialize + super + @_servers = [] + end + + def close + @_servers.each do |si| + si.server.close rescue nil + end + + super + end + + def terminate + @_servers = [] + super + end + + def server_certopts_validate!(certopts) + raise "not implemented yet" + end + + def server_socket_manager_client + socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH'] + if Fluent.windows? + socket_manager_path = socket_manager_path.to_i + end + ServerEngine::SocketManager::Client.new(socket_manager_path) + end + + def server_create_tcp_socket(shared, bind, port) + sock = if shared + server_socket_manager_client.listen_tcp(bind, port) + else + TCPServer.new(bind, port) # this method call can create sockets for AF_INET6 + end + sock.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) # close-on-exec + sock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # nonblock + sock + end + + def server_create_udp_socket(shared, bind, port) + sock = if shared + server_socket_manager_client.listen_udp(bind, port) + else + family = IPAddr.new(IPSocket.getaddress(bind)).ipv4? ? ::Socket::AF_INET : ::Socket::AF_INET6 + usock = UDPSocket.new(family) + usock.bind(bind, port) + usock + end + sock.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) # close-on-exec + sock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # nonblock + sock + end + + def server_create_tls_socket(shared, bind, port) + raise "not implemented yet" + end + + class CallbackSocket + def initialize(server_type, sock, enabled_events = []) + @server_type = server_type + @sock = sock + @enabled_events = enabled_events + end + + def remote_addr + @sock.peeraddr[3] + end + + def remote_host + @sock.peeraddr[2] + end + + def remote_port + @sock.peeraddr[1] + end + + def send(data, flag) + @sock.send(data, flag) + end + + def write(data) + @sock.write(data) + end + + def close + @sock.close + end + + def data(&callback) + on(:data, &callback) + end + + def on(event, &callback) + raise "BUG: this event is disabled for #{@server_type}" unless @enabled_events.include?(event) + case event + when :data + @sock.data(&callback) + when :write_complete + @sock.on_write_complete(&callback) + when :before_close + @sock.before_close(&callback) + when :close + @sock.on_close(&callback) + else + raise "BUG: unknown event: #{event}" + end + end + end + + class TCPCallbackSocket < CallbackSocket + def initialize(sock) + super("tcp", sock, [:data, :write_complete, :before_close, :close]) + end + end + + class UDPCallbackSocket < CallbackSocket + def initialize(sock) + super("udp", sock, [:write_complete]) + end + + def write(data) + self.send(data, 0) + end + end + + module EventHandler + class UDPServer < Coolio::IO + def initialize(sock, resolve_name, max_bytes, flags, log, under_plugin_development, &callback) + raise ArgumentError, "socket must be a UDPSocket: sock = #{sock}" unless sock.is_a?(UDPSocket) + + super(sock) + + @sock = sock + @resolve_name = resolve_name + @max_bytes = max_bytes + @flags = flags + @log = log + @under_plugin_development = under_plugin_development + @callback = callback + + @sock.do_not_reverse_lookup = !resolve_name + on_readable_impl = case @callback.arity + when 1 then :on_readable_without_sock + when 2 then :on_readable_with_sock + else + raise "BUG: callback block must have 1 or 2 arguments" + end + self.define_singleton_method(:on_readable, method(on_readable_impl)) + end + + def on_readable_without_sock + begin + data = @sock.recv(@max_bytes, @flags) + rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR + return + end + @callback.call(data) + rescue => e + @log.error "unexpected error in processing UDP data", error: e + @log.error_backtrace + raise if @under_plugin_development + end + + def on_readable_with_sock + begin + data, addr = @sock.recvfrom(@max_bytes) + rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR + return + end + sock = UDPSocket.new(addr[0]) # Address family: "AF_INET", "AF_INET6" + sock.do_not_reverse_lookup = !@resolve_name + sock.connect(addr[3], addr[1]) + @callback.call(data, UDPCallbackSocket.new(sock)) + rescue => e + @log.error "unexpected error in processing UDP data", error: e + @log.error_backtrace + raise if @under_plugin_development + end + end + + class TCPServer < Coolio::TCPSocket + SOCK_OPT_FORMAT = 'I!I!' # { int l_onoff; int l_linger; } + + def initialize(sock, resolve_name, linger_timeout, log, under_plugin_development, connect_callback) + raise ArgumentError, "socket must be a TCPSocket" unless sock.is_a?(TCPSocket) + + super(sock) + + sock_opt = [1, linger_timeout].pack(SOCK_OPT_FORMAT) + sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_LINGER, sock_opt) + + @resolve_name = resolve_name + @log = log + @connect_callback = connect_callback + + @under_plugin_development = under_plugin_development + + @data_callback = nil + @closing = false + @mutex = Mutex.new # to serialize #write and #close + end + + def data(&callback) + @data_callback = callback + on_read_impl = case callback.arity + when 1 then :on_read_without_connection + when 2 then :on_read_with_connection + else + raise "BUG: callback block must have 1 or 2 arguments" + end + self.define_singleton_method(:on_read, method(on_read_impl)) + end + + def write(data) + raise IOError, "server TCP connection is already going to be closed" if @closing + @mutex.synchronize do + super + end + end + + def on_connect + conn = TCPCallbackSocket.new(self) + @connect_callback.call(conn) + unless @data_callback + raise "connection callback must call #data to set data callback" + end + end + + def on_read_without_connection(data) + @data_callback.call(data) + rescue => e + @log.error "unexpected error on reading data", host: remote_host, port: remote_port, error: e + @log.error_backtrace + close(true) rescue nil + raise if @under_plugin_development + end + + def on_read_with_connection(data) + @data_callback.call(data, self) + rescue => e + @log.error "unexpected error on reading data", host: remote_host, port: remote_port, error: e + @log.error_backtrace + close(true) rescue nil + raise if @under_plugin_development + end + + def close(force = false) + @closing = true + if force + super() + else + @mutex.synchronize{ super() } + end + end + end + end + end + end +end diff --git a/lib/fluent/test/driver/base.rb b/lib/fluent/test/driver/base.rb index 6b7d5af237..1a5fbe39c3 100644 --- a/lib/fluent/test/driver/base.rb +++ b/lib/fluent/test/driver/base.rb @@ -18,6 +18,8 @@ require 'fluent/config/element' require 'fluent/log' +require 'serverengine/socket_manager' +require 'fileutils' require 'timeout' module Fluent @@ -46,6 +48,8 @@ def initialize(klass, opts: {}, &block) end @instance.under_plugin_development = true + @socket_manager_server = nil + @logs = [] @test_clock_id = Process::CLOCK_MONOTONIC_RAW rescue Process::CLOCK_MONOTONIC @@ -101,6 +105,13 @@ def run(timeout: nil, start: true, shutdown: true, &block) end def instance_start + if @instance.respond_to?(:server_wait_until_start) + @socket_manager_path = ServerEngine::SocketManager::Server.generate_path + FileUtils.rm_f @socket_manager_path if File.exist?(@socket_manager_path) + @socket_manager_server = ServerEngine::SocketManager::Server.open(@socket_manager_path) + ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = @socket_manager_path.to_s + end + unless @instance.started? @instance.start end @@ -145,7 +156,16 @@ def instance_shutdown @instance.thread_wait_until_stop end + if @instance.respond_to?(:server_wait_until_stop) + @instance.server_wait_until_stop + end + @instance.terminate unless @instance.terminated? + + if @socket_manager_server + @socket_manager_server.close + FileUtils.rm_f @socket_manager_path if File.exist?(@socket_manager_path) + end end def run_actual(timeout: DEFAULT_TIMEOUT, &block) From 1e7b78fa7035084be7756f6694f49ce8bcc180be Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 16 Nov 2016 18:24:22 +0900 Subject: [PATCH 03/29] migrate plugins to v0.14 APIs with server plugin helper --- lib/fluent/plugin/in_tcp.rb | 66 ++++++++++--- lib/fluent/plugin/in_udp.rb | 60 +++++++++--- test/plugin/test_in_tcp.rb | 140 ++++++++++++++++------------ test/plugin/test_in_udp.rb | 181 ++++++++++++++++++++---------------- 4 files changed, 278 insertions(+), 169 deletions(-) diff --git a/lib/fluent/plugin/in_tcp.rb b/lib/fluent/plugin/in_tcp.rb index addfc604c4..37dc5a10ba 100644 --- a/lib/fluent/plugin/in_tcp.rb +++ b/lib/fluent/plugin/in_tcp.rb @@ -14,28 +14,66 @@ # limitations under the License. # -require 'cool.io' +require 'fluent/plugin/input' -require 'fluent/plugin/socket_util' +module Fluent::Plugin + class TcpInput < Input + Fluent::Plugin.register_input('tcp', self) -module Fluent - class TcpInput < SocketUtil::BaseInput - Plugin.register_input('tcp', self) + helpers :server, :parser, :extract, :compat_parameters + + desc 'Tag of output events.' + config_param :tag, :string + desc 'The port to listen to.' + config_param :port, :integer, default: 5170 + desc 'The bind address to listen to.' + config_param :bind, :string, default: '0.0.0.0' + + desc "The field name of the client's hostname." + config_param :source_host_key, :string, default: nil, deprecated: "use source_hostname_key instead." + desc "The field name of the client's hostname." + config_param :source_hostname_key, :string, default: nil + + config_param :blocking_timeout, :time, default: 0.5 - config_set_default :port, 5170 desc 'The payload is read up to this character.' config_param :delimiter, :string, default: "\n" # syslog family add "\n" to each message and this seems only way to split messages in tcp stream - def listen(callback) - log.info "listening tcp socket on #{@bind}:#{@port}" + def configure(conf) + compat_parameters_convert(conf, :parser) + super + @_event_loop_blocking_timeout = @blocking_timeout + @parser = parser_create + end + + def start + super + + @buffer = '' + server_create(:in_tcp_server, @port, proto: :tcp, bind: @bind) do |data, conn| + @buffer << data + begin + pos = 0 + while i = @buffer.index(@delimiter, pos) + msg = @buffer[pos...i] + pos = i + @delimiter.length + + @parser.parse(msg) do |time, record| + unless time && record + log.warn "pattern not match", message: msg + next + end - socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH'] - if Fluent.windows? - socket_manager_path = socket_manager_path.to_i + tag = extract_tag_from_record(record) + tag ||= @tag + time ||= extract_time_from_record(record) || Fluent::EventTime.now + record[@source_host_key] = conn.remote_host if @source_host_key + router.emit(tag, time, record) + end + end + @buffer.slice!(0, pos) if pos > 0 + end end - client = ServerEngine::SocketManager::Client.new(socket_manager_path) - lsock = client.listen_tcp(@bind, @port) - Coolio::TCPServer.new(lsock, nil, SocketUtil::TcpHandler, log, @delimiter, callback) end end end diff --git a/lib/fluent/plugin/in_udp.rb b/lib/fluent/plugin/in_udp.rb index 14f08fc574..d4718a8c1d 100644 --- a/lib/fluent/plugin/in_udp.rb +++ b/lib/fluent/plugin/in_udp.rb @@ -14,24 +14,58 @@ # limitations under the License. # -require 'fluent/plugin/socket_util' +require 'fluent/plugin/input' -module Fluent - class UdpInput < SocketUtil::BaseInput - Plugin.register_input('udp', self) +module Fluent::Plugin + class UdpInput < Input + Fluent::Plugin.register_input('udp', self) + + helpers :server, :parser, :extract, :compat_parameters + + desc 'Tag of output events.' + config_param :tag, :string + desc 'The port to listen to.' + config_param :port, :integer, default: 5160 + desc 'The bind address to listen to.' + config_param :bind, :string, default: '0.0.0.0' + + desc "The field name of the client's hostname." + config_param :source_host_key, :string, default: nil, deprecated: "use source_hostname_key instead." + desc "The field name of the client's hostname." + config_param :source_hostname_key, :string, default: nil - config_set_default :port, 5160 config_param :body_size_limit, :size, default: 4096 - def listen(callback) - log.info "listening udp socket on #{@bind}:#{@port}" - socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH'] - if Fluent.windows? - socket_manager_path = socket_manager_path.to_i + config_param :blocking_timeout, :time, default: 0.5 + + def configure(conf) + compat_parameters_convert(conf, :parser) + super + @_event_loop_blocking_timeout = @blocking_timeout + @parser = parser_create + end + + def start + super + + log.info "listening udp socket", bind: @bind, port: @port + server_create(:in_udp_server, @port, proto: :udp, bind: @bind, max_bytes: @body_size_limit) do |data, sock| + data.chomp! + begin + @parser.parse(data) do |time, record| + unless time && record + log.warn "pattern not match", data: data + next + end + + tag = extract_tag_from_record(record) + tag ||= @tag + time ||= extract_time_from_record(record) || Fluent::EventTime.now + record[@source_host_key] = sock.remote_host if @source_host_key + router.emit(tag, time, record) + end + end end - client = ServerEngine::SocketManager::Client.new(socket_manager_path) - @usock = client.listen_udp(@bind, @port) - SocketUtil::UdpHandler.new(@usock, log, @body_size_limit, callback) end end end diff --git a/test/plugin/test_in_tcp.rb b/test/plugin/test_in_tcp.rb index c6416a1672..c53b4478cc 100755 --- a/test/plugin/test_in_tcp.rb +++ b/test/plugin/test_in_tcp.rb @@ -1,20 +1,8 @@ require_relative '../helper' -require 'fluent/test' +require 'fluent/test/driver/input' require 'fluent/plugin/in_tcp' class TcpInputTest < Test::Unit::TestCase - class << self - def startup - socket_manager_path = ServerEngine::SocketManager::Server.generate_path - @server = ServerEngine::SocketManager::Server.open(socket_manager_path) - ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s - end - - def shutdown - @server.close - end - end - def setup Fluent::Test.setup end @@ -34,69 +22,97 @@ def setup ] def create_driver(conf) - Fluent::Test::InputTestDriver.new(Fluent::TcpInput).configure(conf) + Fluent::Test::Driver::Input.new(Fluent::Plugin::TcpInput).configure(conf) end - def test_configure - configs = {'127.0.0.1' => CONFIG} - configs.merge!('::1' => IPv6_CONFIG) if ipv6_enabled? + def create_tcp_socket(host, port, &block) + if block_given? + TCPSocket.open(host, port, &block) + else + TCPSocket.open(host, port) + end + end + + + data( + 'ipv4' => [CONFIG, '127.0.0.1', :ipv4], + 'ipv6' => [IPv6_CONFIG, '::1', :ipv6], + ) + test 'configure' do |data| + conf, bind, protocol = data + omit "IPv6 is not supported on this environment" if protocol == :ipv6 && !ipv6_enabled? - configs.each_pair { |k, v| - d = create_driver(v) - assert_equal PORT, d.instance.port - assert_equal k, d.instance.bind - assert_equal "\n", d.instance.delimiter - } + d = create_driver(conf) + assert_equal PORT, d.instance.port + assert_equal bind, d.instance.bind + assert_equal "\n", d.instance.delimiter end - { - 'none' => [ - {'msg' => "tcptest1\n", 'expected' => 'tcptest1'}, - {'msg' => "tcptest2\n", 'expected' => 'tcptest2'}, - ], - 'json' => [ - {'msg' => {'k' => 123, 'message' => 'tcptest1'}.to_json + "\n", 'expected' => 'tcptest1'}, - {'msg' => {'k' => 'tcptest2', 'message' => 456}.to_json + "\n", 'expected' => 456}, - ] - }.each { |format, test_cases| - define_method("test_msg_size_#{format}") do - d = create_driver(BASE_CONFIG + "format #{format}") - tests = test_cases + test_case_data = { + 'none' => { + 'format' => 'none', + 'payloads' => [ "tcptest1\n", "tcptest2\n" ], + 'expecteds' => [ + {'message' => 'tcptest1'}, + {'message' => 'tcptest2'}, + ], + }, + 'json' => { + 'format' => 'json', + 'payloads' => [ + {'k' => 123, 'message' => 'tcptest1'}.to_json + "\n", + {'k' => 'tcptest2', 'message' => 456}.to_json + "\n", + ], + 'expecteds' => [ + {'k' => 123, 'message' => 'tcptest1'}, + {'k' => 'tcptest2', 'message' => 456} + ], + }, + } + + data(test_case_data) + test 'test_msg_size' do |data| + format = data['format'] + payloads = data['payloads'] + expecteds = data['expecteds'] - d.run do - tests.each {|test| - TCPSocket.open('127.0.0.1', PORT) do |s| - s.send(test['msg'], 0) - end - } - sleep 1 + d = create_driver(BASE_CONFIG + "format #{format}") + d.run(expect_records: 2) do + payloads.each do |payload| + create_tcp_socket('127.0.0.1', PORT) do |sock| + sock.send(payload, 0) + end end + end - compare_test_result(d.emits, tests) + assert_equal 2, d.events.size + expecteds.each_with_index do |expected_record, i| + assert_equal "tcp", d.events[i][0] + assert d.events[i][1].is_a?(Fluent::EventTime) + assert_equal expected_record, d.events[i][2] end + end - define_method("test_msg_size_with_same_connection_#{format}") do - d = create_driver(BASE_CONFIG + "format #{format}") - tests = test_cases + data(test_case_data) + test 'test data in a connection' do |data| + format = data['format'] + payloads = data['payloads'] + expecteds = data['expecteds'] - d.run do - TCPSocket.open('127.0.0.1', PORT) do |s| - tests.each {|test| - s.send(test['msg'], 0) - } + d = create_driver(BASE_CONFIG + "format #{format}") + d.run(expect_records: 2) do + create_tcp_socket('127.0.0.1', PORT) do |sock| + payloads.each do |payload| + sock.send(payload, 0) end - sleep 1 end - - compare_test_result(d.emits, tests) end - } - def compare_test_result(emits, tests) - assert_equal(2, emits.size) - emits.each_index {|i| - assert_equal(tests[i]['expected'], emits[i][2]['message']) - assert(emits[i][1].is_a?(Fluent::EventTime)) - } + assert_equal 2, d.events.size + expecteds.each_with_index do |expected_record, i| + assert_equal "tcp", d.events[i][0] + assert d.events[i][1].is_a?(Fluent::EventTime) + assert_equal expected_record, d.events[i][2] + end end end diff --git a/test/plugin/test_in_udp.rb b/test/plugin/test_in_udp.rb index 01c47c1b8a..d74588b77e 100755 --- a/test/plugin/test_in_udp.rb +++ b/test/plugin/test_in_udp.rb @@ -1,20 +1,8 @@ require_relative '../helper' -require 'fluent/test' +require 'fluent/test/driver/input' require 'fluent/plugin/in_udp' class UdpInputTest < Test::Unit::TestCase - class << self - def startup - socket_manager_path = ServerEngine::SocketManager::Server.generate_path - @server = ServerEngine::SocketManager::Server.open(socket_manager_path) - ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s - end - - def shutdown - @server.close - end - end - def setup Fluent::Test.setup end @@ -34,88 +22,121 @@ def setup ! def create_driver(conf) - Fluent::Test::InputTestDriver.new(Fluent::UdpInput).configure(conf) + Fluent::Test::Driver::Input.new(Fluent::Plugin::UdpInput).configure(conf) end - def test_configure - configs = {'127.0.0.1' => CONFIG} - configs.merge!('::1' => IPv6_CONFIG) if ipv6_enabled? + def create_udp_socket(host, port) + u = if IPAddr.new(IPSocket.getaddress(host)).ipv4? + UDPSocket.new(Socket::AF_INET) + else + UDPSocket.new(Socket::AF_INET6) + end + u.connect(host, port) + if block_given? + begin + yield u + ensure + u.close rescue nil + end + else + u + end + end - configs.each_pair { |k, v| - d = create_driver(v) - assert_equal PORT, d.instance.port - assert_equal k, d.instance.bind - assert_equal 4096, d.instance.body_size_limit - } + data( + 'ipv4' => [CONFIG, '127.0.0.1', :ipv4], + 'ipv6' => [IPv6_CONFIG, '::1', :ipv6], + ) + test 'configure' do |data| + conf, bind, protocol = data + omit "IPv6 is not supported on this environment" if protocol == :ipv6 && !ipv6_enabled? + + d = create_driver(conf) + assert_equal PORT, d.instance.port + assert_equal bind, d.instance.bind + assert_equal 4096, d.instance.body_size_limit end - def test_time_format - configs = {'127.0.0.1' => CONFIG} - configs.merge!('::1' => IPv6_CONFIG) if ipv6_enabled? + data( + 'ipv4' => [CONFIG, '127.0.0.1', :ipv4], + 'ipv6' => [IPv6_CONFIG, '::1', :ipv6], + ) + test 'time_format' do |data| + conf, bind, protocol = data + omit "IPv6 is not supported on this environment" if protocol == :ipv6 && !ipv6_enabled? - configs.each_pair { |k, v| - d = create_driver(v) + d = create_driver(conf) - tests = [ - {'msg' => '[Sep 11 00:00:00] localhost logger: foo', 'expected' => Fluent::EventTime.from_time(Time.strptime('Sep 11 00:00:00', '%b %d %H:%M:%S'))}, - {'msg' => '[Sep 1 00:00:00] localhost logger: foo', 'expected' => Fluent::EventTime.from_time(Time.strptime('Sep 1 00:00:00', '%b %d %H:%M:%S'))}, - ] + tests = [ + {'msg' => '[Sep 11 00:00:00] localhost logger: foo', 'expected' => event_time('Sep 11 00:00:00', format: '%b %d %H:%M:%S')}, + {'msg' => '[Sep 1 00:00:00] localhost logger: foo', 'expected' => event_time('Sep 1 00:00:00', format: '%b %d %H:%M:%S')}, + ] - d.run do - u = Fluent::SocketUtil.create_udp_socket(k) - u.connect(k, PORT) - tests.each {|test| + d.run(expect_records: 2) do + create_udp_socket(bind, PORT) do |u| + tests.each do |test| u.send(test['msg'], 0) - } - u.close - sleep 1 + end end + end - emits = d.emits - emits.each_index {|i| - assert_equal_event_time(tests[i]['expected'], emits[i][1]) - } - } - + events = d.events + tests.each_with_index do |t, i| + assert_equal_event_time(t['expected'], events[i][1]) + end end - { - 'none' => [ - {'msg' => "tcptest1\n", 'expected' => 'tcptest1'}, - {'msg' => "tcptest2\n", 'expected' => 'tcptest2'}, - ], - 'json' => [ - {'msg' => {'k' => 123, 'message' => 'tcptest1'}.to_json + "\n", 'expected' => 'tcptest1'}, - {'msg' => {'k' => 'tcptest2', 'message' => 456}.to_json + "\n", 'expected' => 456}, - ], - '/^\\[(?