From c1945a4e6fabbddfd0e9cf5003a66666947b82f9 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 13 Jan 2017 20:23:36 +0900 Subject: [PATCH 01/17] add a method to get initialized section object to use it instead of configuration files --- lib/fluent/config/section.rb | 4 + lib/fluent/configurable.rb | 36 +++++++- test/config/test_configurable.rb | 154 +++++++++++++++++++++++++++++++ 3 files changed, 189 insertions(+), 5 deletions(-) diff --git a/lib/fluent/config/section.rb b/lib/fluent/config/section.rb index 81b8954b1a..a61cb055b7 100644 --- a/lib/fluent/config/section.rb +++ b/lib/fluent/config/section.rb @@ -75,6 +75,10 @@ def [](key) @params[key.to_sym] end + def []=(key, value) + @params[key.to_sym] = value + end + def respond_to?(symbol, include_all=false) case symbol when :inspect, :nil?, :to_h, :+, :instance_of?, :kind_of?, :[], :respond_to?, :respond_to_missing? diff --git a/lib/fluent/configurable.rb b/lib/fluent/configurable.rb index 4b8c6ac6ca..7d5d4cf60b 100644 --- a/lib/fluent/configurable.rb +++ b/lib/fluent/configurable.rb @@ -48,12 +48,8 @@ def initialize end end - def configure(conf) - @config = conf - - logger = self.respond_to?(:log) ? log : (defined?($log) ? $log : nil) + def configure_proxy_generate proxy = self.class.merged_configure_proxy - conf.corresponding_proxies << proxy if self.respond_to?(:owner) && self.owner owner_proxy = owner.class.merged_configure_proxy @@ -63,6 +59,36 @@ def configure(conf) proxy.overwrite_defaults(owner_proxy) if owner_proxy end + proxy + end + + def configured_section_create(name, conf = nil) + conf ||= Fluent::Config::Element.new(name.to_s, '', {}, []) + root_proxy = configure_proxy_generate + proxy = if name.nil? # root + root_proxy + else + root_proxy.sections[name] + end + # take care to raise Fluent::ConfigError if conf mismatched to proxy + Fluent::Config::SectionGenerator.generate(proxy, conf, nil, nil) + end + + def configure(conf) + @config = conf + + logger = if self.respond_to?(:log) + self.log + elsif self.respond_to?(:owner) && self.owner.respond_to?(:log) + self.owner.log + elsif defined?($log) + $log + else + nil + end + proxy = configure_proxy_generate + conf.corresponding_proxies << proxy + # In the nested section, can't get plugin class through proxies so get plugin class here plugin_class = Fluent::Plugin.lookup_type_from_class(proxy.name.to_s) root = Fluent::Config::SectionGenerator.generate(proxy, conf, logger, plugin_class) diff --git a/test/config/test_configurable.rb b/test/config/test_configurable.rb index 6cf4bc9cf7..66d34b0847 100644 --- a/test/config/test_configurable.rb +++ b/test/config/test_configurable.rb @@ -24,6 +24,12 @@ def get_all end end + class Base1Safe < Base1 + config_set_default :name1, "basex1" + config_set_default :name2, "basex2" + config_set_default :opt1, :baz + end + class Base2 < Base1 config_set_default :name2, "base2" config_set_default :name4, "base2" @@ -89,6 +95,40 @@ def get_all end end + class Base4Safe < Base4 + # config_section :node, param_name: :nodes do + # config_argument :num, :integer + # config_param :name, :string, default: "node" + # config_param :type, :string, default: "b4" + # end + # config_section :description1, required: false, multi: false do + # config_argument :note, :string, default: "desc1" + # config_param :text, :string + # end + # config_section :description2, required: true, multi: false do + # config_argument :note, :string, default: "desc2" + # config_param :text, :string + # end + # config_section :description3, required: true, multi: true do + # config_argument :note, default: "desc3" do |val| + # "desc3: #{val}" + # end + # config_param :text, :string + # end + config_section :node do + config_set_default :num, 0 + end + config_section :description1 do + config_set_default :text, "teeeext" + end + config_section :description2 do + config_set_default :text, nil + end + config_section :description3 do + config_set_default :text, "yay" + end + end + class Init0 include Fluent::Configurable config_section :sec1, init: true, multi: false do @@ -407,6 +447,61 @@ class TestConfigurable < ::Test::Unit::TestCase end end + sub_test_case '#configured_section_create' do + test 'raises configuration error if required param exists but no configuration element is specified' do + obj = ConfigurableSpec::Base1.new + assert_raise(Fluent::ConfigError.new("'name1' parameter is required")) do + obj.configured_section_create(nil) + end + end + + test 'creates root section with default values if name and config are specified with nil' do + obj = ConfigurableSpec::Base1Safe.new + root = obj.configured_section_create(nil) + + assert_equal "node", root.node + assert_false root.flag1 + assert_true root.flag2 + assert_equal "basex1", root.name1 + assert_equal "basex2", root.name2 + assert_equal "base1", root.name3 + assert_equal "base1", root.name4 + assert_equal :baz, root.opt1 + assert_equal :foo, root.opt2 + end + + test 'creates root section with default values if name is nil and config is empty element' do + obj = ConfigurableSpec::Base1Safe.new + root = obj.configured_section_create(nil, config_element()) + + assert_equal "node", root.node + assert_false root.flag1 + assert_true root.flag2 + assert_equal "basex1", root.name1 + assert_equal "basex2", root.name2 + assert_equal "base1", root.name3 + assert_equal "base1", root.name4 + assert_equal :baz, root.opt1 + assert_equal :foo, root.opt2 + end + + test 'creates root section with specified value if name is nil and configuration element is specified' do + obj = ConfigurableSpec::Base1Safe.new + root = obj.configured_section_create(nil, config_element('match', '', {'node' => "nodename", 'flag1' => 'true', 'name1' => 'fixed1', 'opt1' => 'foo'})) + + assert_equal "nodename", root.node + assert_equal "fixed1", root.name1 + assert_true root.flag1 + assert_equal :foo, root.opt1 + + assert_true root.flag2 + assert_equal "basex2", root.name2 + assert_equal "base1", root.name3 + assert_equal "base1", root.name4 + assert_equal :foo, root.opt2 + end + end + sub_test_case '#configure' do test 'returns configurable object itself' do b2 = ConfigurableSpec::Base2.new @@ -526,6 +621,65 @@ class TestConfigurable < ::Test::Unit::TestCase end end + sub_test_case '#configured_section_create' do + test 'raises configuration error if required param exists but no configuration element is specified' do + obj = ConfigurableSpec::Base4.new + assert_raise(Fluent::ConfigError.new("'' section requires argument")) do + obj.configured_section_create(:node) + end + assert_raise(Fluent::ConfigError.new("'text' parameter is required")) do + obj.configured_section_create(:description1) + end + end + + test 'creates any defined section with default values if name is nil and config is not specified' do + obj = ConfigurableSpec::Base4Safe.new + node = obj.configured_section_create(:node) + assert_equal 0, node.num + assert_equal "node", node.name + assert_equal "b4", node.type + + desc1 = obj.configured_section_create(:description1) + assert_equal "desc1", desc1.note + assert_equal "teeeext", desc1.text + end + + test 'creates any defined section with default values if name is nil and config is empty element' do + obj = ConfigurableSpec::Base4Safe.new + node = obj.configured_section_create(:node, config_element()) + assert_equal 0, node.num + assert_equal "node", node.name + assert_equal "b4", node.type + + desc1 = obj.configured_section_create(:description1, config_element()) + assert_equal "desc1", desc1.note + assert_equal "teeeext", desc1.text + end + + test 'creates any defined section with specified value if name is nil and configuration element is specified' do + obj = ConfigurableSpec::Base4Safe.new + node = obj.configured_section_create(:node, config_element('node', '1', {'name' => 'node1', 'type' => 'b1'})) + assert_equal 1, node.num + assert_equal "node1", node.name + assert_equal "b1", node.type + + desc1 = obj.configured_section_create(:description1, config_element('description1', 'desc one', {'text' => 't'})) + assert_equal "desc one", desc1.note + assert_equal "t", desc1.text + end + + test 'creates a defined section instance even if it is defined as multi:true' do + obj = ConfigurableSpec::Base4Safe.new + desc3 = obj.configured_section_create(:description3) + assert_equal "desc3", desc3.note + assert_equal "yay", desc3.text + + desc3 = obj.configured_section_create(:description3, config_element('description3', 'foo')) + assert_equal "desc3: foo", desc3.note + assert_equal "yay", desc3.text + end + end + sub_test_case '#configure' do BASE_ATTRS = { "name1" => "1", "name2" => "2", "name3" => "3", From e4cd065a12febd996c7cfa6a4f9034357ea568c1 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 13 Jan 2017 20:24:39 +0900 Subject: [PATCH 02/17] add TLS support --- lib/fluent/plugin_helper/cert_option.rb | 142 +++++++++++ lib/fluent/plugin_helper/server.rb | 287 ++++++++++++++++++++-- lib/fluent/plugin_helper/socket_option.rb | 19 +- test/plugin_helper/test_server.rb | 157 ++++++++---- 4 files changed, 522 insertions(+), 83 deletions(-) create mode 100644 lib/fluent/plugin_helper/cert_option.rb diff --git a/lib/fluent/plugin_helper/cert_option.rb b/lib/fluent/plugin_helper/cert_option.rb new file mode 100644 index 0000000000..e03cc27c0e --- /dev/null +++ b/lib/fluent/plugin_helper/cert_option.rb @@ -0,0 +1,142 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'openssl' +require 'socket' + +# this module is only for Socket/Server plugin helpers +module Fluent + module PluginHelper + module CertOption + def cert_option_create_context(version, insecure, ciphers, conf) + cert, key, extra_chain_certs = cert_option_server_validate!(conf) + + ctx = OpenSSL::SSL::SSLContext.new(version) + unless insecure + # inject OpenSSL::SSL::SSLContext::DEFAULT_PARAMS + # https://bugs.ruby-lang.org/issues/9424 + ctx.set_params({}) + + ctx.ciphers = ciphers + end + + ctx.cert = cert + ctx.key = key + if extra_chain_certs + ctx.extra_chain_cert = extra_chain_certs + end + + ctx + end + + def cert_option_server_validate!(conf) + case + when conf.cert_path + raise Fluent::ConfigError, "private_key_path is required when cert_path is specified" unless conf.private_key_path + raise Fluent::ConfigError, "private_key_passphrase is required when cert_path is specified" unless conf.private_key_passphrase + cert_option_load(conf.cert_path, conf.private_key_path, conf.private_key_passphrase) + + when conf.ca_cert_path + raise Fluent::ConfigError, "ca_private_key_path is required when ca_cert_path is specified" unless conf.ca_private_key_path + raise Fluent::ConfigError, "ca_private_key_passphrase is required when ca_cert_path is specified" unless conf.ca_private_key_passphrase + generate_opts = cert_option_cert_generation_opts_from_conf(conf) + cert_option_generate_server_pair_by_ca( + conf.ca_cert_path, + conf.ca_private_key_path, + conf.private_key_passphrase, + generate_opts + ) + + when conf.insecure + log.warn "insecure TLS communication server is configured (using 'insecure' mode)" + generate_opts = cert_option_cert_generation_opts_from_conf(conf) + cert_option_generate_server_pair_self_signed(generate_opts) + + else + raise Fluent::ConfigError, "no valid cert options configured. specify either 'cert_path', 'ca_cert_path' or 'insecure'" + end + end + + def cert_option_load(cert_path, private_key_path, private_key_passphrase) + key = OpenSSL::PKey::RSA.new(File.read(private_key_path), private_key_passphrase) + certs = cert_option_certificates_from_file(cert_path) + cert = certs.shift + extra_chain_certs = certs + return cert, key, extra_chain_certs + end + + def cert_option_cert_generation_opts_from_conf(conf) + { + private_key_length: conf.generate_private_key_length, + country: conf.generate_cert_country, + state: conf.generate_cert_state, + locality: conf.generate_cert_locality, + common_name: conf.generate_cert_common_name || ::Socket.gethostname, + expiration: conf.generate_cert_expiration, + digest: conf.generate_cert_digest, + } + end + + def cert_option_generate_server_pair(opts, issuer = nil) + key = OpenSSL::PKey::RSA.generate(opts[:private_key_length]) + + subject = OpenSSL::X509::Name.new + subject.add_entry('C', opts[:country]) + subject.add_entry('ST', opts[:state]) + subject.add_entry('L', opts[:locality]) + subject.add_entry('CN', opts[:common_name]) + + issuer ||= subject + + cert = OpenSSL::X509::Certificate.new + cert.not_before = Time.at(0) + cert.not_after = Time.now + opts[:expiration] + cert.public_key = key + cert.serial = 1 + cert.issuer = issuer + cert.subject = subject + + # basicConstraints: this cert is for CA or not + cert.add_extension OpenSSL::X509::Extension.new('basicConstraints', OpenSSL::ASN1.Sequence([OpenSSL::ASN1::Boolean(false)])) + cert.add_extension OpenSSL::X509::Extension.new('nsCertType', 'server') + + return cert, key + end + + def cert_option_generate_server_pair_by_ca(ca_cert_path, ca_key_path, ca_key_passphrase, generate_opts) + ca_key = OpenSSL::PKey::RSA.new(File.read(ca_key_path), ca_key_passphrase) + ca_cert = OpenSSL::X509::Certificate.new(File.read(ca_cert_path)) + cert, key = cert_option_generate_server_pair(generate_opts, ca_cert.issuer) + cert.sign(ca_key, generate_opts[:digest].to_s) + return cert, key, nil + end + + def cert_option_generate_server_pair_self_signed(generate_opts) + cert, key = cert_option_generate_server_pair(generate_opts) + cert.sign(key, generate_opts[:digest].to_s) + return cert, key, nil + end + + def cert_option_certificates_from_file(path) + data = File.read(path) + pattern = Regexp.compile('-+BEGIN CERTIFICATE-+\n(?:[^-]*\n)+-+END CERTIFICATE-+\n', Regexp::MULTILINE) + list = [] + data.scan(pattern){|match| list << OpenSSL::X509::Certificate.new(match) } + list + end + end + end +end diff --git a/lib/fluent/plugin_helper/server.rb b/lib/fluent/plugin_helper/server.rb index d13fdd9f3a..5237955509 100644 --- a/lib/fluent/plugin_helper/server.rb +++ b/lib/fluent/plugin_helper/server.rb @@ -21,18 +21,21 @@ require 'socket' require 'ipaddr' require 'fcntl' +require 'openssl' require_relative 'socket_option' +require_relative 'cert_option' module Fluent module PluginHelper module Server include Fluent::PluginHelper::EventLoop include Fluent::PluginHelper::SocketOption + include Fluent::PluginHelper::CertOption # This plugin helper doesn't support these things for now: - # * SSL/TLS (TBD) # * TCP/TLS keepalive + # * unix domain sockets # stop : [-] # shutdown : detach server event handler from event loop (event_loop) @@ -63,12 +66,16 @@ def server_wait_until_stop # conn.close # end # end - def server_create_connection(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, backlog: nil, **socket_options, &block) + def server_create_connection(title, port, proto: nil, bind: '0.0.0.0', shared: true, backlog: nil, tls_options: nil, **socket_options, &block) + proto ||= (@transport_config && @transport_config.protocol == :tls) ? :tls : :tcp + raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol) raise ArgumentError, "BUG: port must be an integer" unless port && port.is_a?(Integer) raise ArgumentError, "BUG: invalid protocol name" unless PROTOCOLS.include?(proto) raise ArgumentError, "BUG: cannot create connection for UDP" unless CONNECTION_PROTOCOLS.include?(proto) + raise ArgumentError, "BUG: tls_options is available only for tls" if tls_options && proto != :tls + raise ArgumentError, "BUG: block not specified which handles connection" unless block_given? raise ArgumentError, "BUG: block must have just one argument" unless block.arity == 1 @@ -83,11 +90,14 @@ def server_create_connection(title, port, proto: :tcp, bind: '0.0.0.0', shared: when :tcp server = server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, &block) when :tls - raise ArgumentError, "BUG: certopts (certificate options) not specified for TLS" unless certopts - # server_certopts_validate!(certopts) - # sock = server_create_tls_socket(shared, bind, port) - # server = nil # ... - raise "not implemented yet" + transport_config = if tls_options + server_create_transport_section_object(tls_options) + elsif @transport_config && @transport_config.protocol == :tls + @transport_config + else + raise ArgumentError, "BUG: TLS transport specified, but certification options are not specified" + end + server = server_create_for_tls_connection(shared, bind, port, transport_config, backlog, socket_option_setter, &block) when :unix raise "not implemented yet" else @@ -108,12 +118,15 @@ def server_create_connection(title, port, proto: :tcp, bind: '0.0.0.0', shared: # sock.remote_port # # ... # end - def server_create(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, socket: nil, backlog: nil, max_bytes: nil, flags: 0, **socket_options, &callback) + def server_create(title, port, proto: nil, bind: '0.0.0.0', shared: true, socket: nil, backlog: nil, tls_options: nil, max_bytes: nil, flags: 0, **socket_options, &callback) + proto ||= (@transport_config && @transport_config.protocol == :tls) ? :tls : :tcp + raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol) raise ArgumentError, "BUG: port must be an integer" unless port && port.is_a?(Integer) raise ArgumentError, "BUG: invalid protocol name" unless PROTOCOLS.include?(proto) raise ArgumentError, "BUG: socket option is available only for udp" if socket && proto != :udp + raise ArgumentError, "BUG: tls_options is available only for tls" if tls_options && proto != :tls raise ArgumentError, "BUG: block not specified which handles received data" unless block_given? raise ArgumentError, "BUG: block must have 1 or 2 arguments" unless callback.arity == 1 || callback.arity == 2 @@ -141,7 +154,16 @@ def server_create(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, socke conn.data(&callback) end when :tls - raise "not implemented yet" + transport_config = if tls_options + server_create_transport_section_object(tls_options) + elsif @transport_config && @transport_config.protocol == :tls + @transport_config + else + raise ArgumentError, "BUG: TLS transport specified, but certification options are not specified" + end + server = server_create_for_tls_connection(shared, bind, port, transport_config, backlog, socket_option_setter) do |conn| + conn.data(&callback) + end when :udp raise ArgumentError, "BUG: max_bytes must be specified for UDP" unless max_bytes if socket @@ -198,6 +220,80 @@ def server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_ server end + def server_create_for_tls_connection(shared, bind, port, conf, backlog, socket_option_setter, &block) + context = cert_option_create_context(conf.version, conf.insecure, conf.ciphers, conf) + sock = server_create_tcp_socket(shared, bind, port) + socket_option_setter.call(sock) + close_callback = ->(conn){ @_server_mutex.synchronize{ @_server_connections.delete(conn) } } + server = Coolio::TCPServer.new(sock, nil, EventHandler::TLSServer, context, socket_option_setter, close_callback, @log, @under_plugin_development, block) do |conn| + @_server_mutex.synchronize do + @_server_connections << conn + end + end + server.listen(backlog) if backlog + server + end + + SERVER_TRANSPORT_PARAMS = [ + :protocol, :version, :ciphers, :insecure, + :cert_path, :private_key_path, :private_key_passphrase, + :ca_cert_path, :ca_private_key_path, :ca_private_key_passphrase, + :generate_private_key_length, + :generate_cert_country, :generate_cert_state, :generate_cert_state, + :generate_cert_locality, :generate_cert_common_name, + :generate_cert_expiration, :generate_cert_digest, + ] + + def server_create_transport_section_object(opts) + transport_section = configured_section_create(:transport) + SERVER_TRANSPORT_PARAMS.each do |param| + if opts.has_key?(param) + transport_section[param] = opts[param] + end + end + transport_section + end + + module ServerTransportParams + TLS_DEFAULT_VERSION = :'TLSv1_2' + TLS_SUPPORTED_VERSIONS = [:'TLSv1_1', :'TLSv1_2'] + ### follow httpclient configuration by nahi + # OpenSSL 0.9.8 default: "ALL:!ADH:!LOW:!EXP:!MD5:+SSLv2:@STRENGTH" + CIPHERS_DEFAULT = "ALL:!aNULL:!eNULL:!SSLv2" # OpenSSL >1.0.0 default + + include Fluent::Configurable + config_section :transport, required: false, multi: false, init: true, param_name: :transport_config do + config_argument :protocol, :enum, list: [:tcp, :tls], default: :tcp + config_param :version, :enum, list: TLS_SUPPORTED_VERSIONS, default: TLS_DEFAULT_VERSION + + config_param :ciphers, :string, default: CIPHERS_DEFAULT + config_param :insecure, :bool, default: false + + # Cert signed by public CA + config_param :cert_path, :string, default: nil + config_param :private_key_path, :string, default: nil + config_param :private_key_passphrase, :string, default: nil, secret: true + + # Cert generated and signed by private CA Certificate + config_param :ca_cert_path, :string, default: nil + config_param :ca_private_key_path, :string, default: nil + config_param :ca_private_key_passphrase, :string, default: nil, secret: true + + # Options for generating certs by private CA certs or self-signed + config_param :generate_private_key_length, :integer, default: 2048 + config_param :generate_cert_country, :string, default: 'US' + config_param :generate_cert_state, :string, default: 'CA' + config_param :generate_cert_locality, :string, default: 'Mountain View' + config_param :generate_cert_common_name, :string, default: nil + config_param :generate_cert_expiration, :time, default: 10 * 365 * 86400 # 10years later + config_param :generate_cert_digest, :enum, list: [:sha1, :sha256, :sha384, :sha512], default: :sha256 + end + end + + def self.included(mod) + mod.include ServerTransportParams + end + def initialize super @_servers = [] @@ -205,6 +301,16 @@ def initialize @_server_mutex = Mutex.new end + def configure(conf) + super + + if @transport_config + if @transport_config.protocol == :tls + cert_option_server_validate!(@transport_config) + end + end + end + def stop @_server_mutex.synchronize do @_servers.each do |si| @@ -230,10 +336,6 @@ def terminate super end - def server_certopts_validate!(certopts) - raise "not implemented yet" - end - def server_socket_manager_client socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH'] if Fluent.windows? @@ -267,28 +369,25 @@ def server_create_udp_socket(shared, bind, port) sock end - def server_create_tls_socket(shared, bind, port) - raise "not implemented yet" - end - class CallbackSocket def initialize(server_type, sock, enabled_events = [], close_socket: true) @server_type = server_type @sock = sock + @peeraddr = nil @enabled_events = enabled_events @close_socket = close_socket end def remote_addr - @sock.peeraddr[3] + @peeraddr[3] end def remote_host - @sock.peeraddr[2] + @peeraddr[2] end def remote_port - @sock.peeraddr[1] + @peeraddr[1] end def send(data, flags = 0) @@ -327,6 +426,18 @@ def on(event, &callback) class TCPCallbackSocket < CallbackSocket def initialize(sock) super("tcp", sock, [:data, :write_complete, :close]) + @peeraddr = @sock.peeraddr + end + + def write(data) + @sock.write(data) + end + end + + class TLSCallbackSocket < CallbackSocket + def initialize(sock) + super("tls", sock, [:data, :write_complete, :close]) + @peeraddr = @sock.to_io.peeraddr end def write(data) @@ -430,6 +541,10 @@ def initialize(sock, socket_option_setter, close_callback, log, under_plugin_dev @mutex = Mutex.new # to serialize #write and #close end + def to_io + @_handler_socket + end + def data(&callback) raise "data callback can be registered just once, but registered twice" if self.singleton_methods.include?(:on_read) @data_callback = callback @@ -459,7 +574,135 @@ def on_connect def on_read_without_connection(data) @data_callback.call(data) rescue => e - @log.error "unexpected error on reading data", host: remote_host, port: remote_port, error: e + @log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e + @log.error_backtrace + close(true) rescue nil + raise if @under_plugin_development + end + + def on_read_with_connection(data) + @data_callback.call(data, @callback_connection) + rescue => e + @log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e + @log.error_backtrace + close(true) rescue nil + raise if @under_plugin_development + end + + def close + @mutex.synchronize do + return if @closing + @closing = true + @close_callback.call(self) + super + end + end + end + + class TLSServer < Coolio::Socket + # It can't use Coolio::TCPSocket, because Coolio::TCPSocket checks that underlying socket (1st argument of super) is TCPSocket. + def initialize(sock, context, socket_option_setter, close_callback, log, under_plugin_development, connect_callback) + raise ArgumentError, "socket must be a TCPSocket: sock=#{sock}" unless sock.is_a?(TCPSocket) + + socket_option_setter.call(sock) + @_handler_socket = OpenSSL::SSL::SSLSocket.new(sock, context) + @_handler_write_buffer = ''.force_encoding('ascii-8bit') + @_handler_accepted = false + super(@_handler_socket) + + @log = log + @under_plugin_development = under_plugin_development + + @connect_callback = connect_callback + @data_callback = nil + @close_callback = close_callback + + @callback_connection = nil + @closing = false + + @mutex = Mutex.new # to serialize #write and #close + end + + def to_io + @_handler_socket.to_io + end + + def data(&callback) + raise "data callback can be registered just once, but registered twice" if self.singleton_methods.include?(:on_read) + @data_callback = callback + on_read_impl = case callback.arity + when 1 then :on_read_without_connection + when 2 then :on_read_with_connection + else + raise "BUG: callback block must have 1 or 2 arguments" + end + self.define_singleton_method(:on_read, method(on_read_impl)) + end + + def write(data) + @mutex.synchronize do + @_handler_write_buffer << data + schedule_write + data.bytesize + end + end + + def try_tls_accept + return if @_handler_accepted + + begin + @_handler_socket.accept_nonblock # this method call actually try to do handshake via TLS + @_handler_accepted = true + + @callback_connection = TLSCallbackSocket.new(self) + @connect_callback.call(@callback_connection) + unless @data_callback + raise "connection callback must call #data to set data callback" + end + rescue IO::WaitReadable, IO::WaitWritable + # retry accept_nonblock: there aren't enough data in underlying socket buffer + rescue OpenSSL::SSL::SSLError => e + @log.trace "unexpected error before accepting TLS connection", error: e + close rescue nil + end + end + + def on_connect + try_tls_accept + end + + def on_readable + if @_handler_accepted + super + else + try_tls_accept + end + end + + def on_writable + begin + @mutex.synchronize do + written_bytes = @_handler_socket.write_nonblock(@_handler_write_buffer) + @_handler_write_buffer.slice!(0, written_bytes) + super + end + rescue IO::WaitWritable, IO::WaitReadable + return + rescue Errno::EINTR + return + rescue SystemCallError, IOError, SocketError + # SystemCallError catches Errno::EPIPE & Errno::ECONNRESET amongst others. + close rescue nil + return + rescue OpenSSL::SSL::SSLError => e + @log.debug "unexpected SSLError while writing data into socket connected via TLS", error: e + end + end + + def on_read_without_connection(data) + @data_callback.call(data) + rescue => e + @log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e @log.error_backtrace close(true) rescue nil raise if @under_plugin_development @@ -468,7 +711,7 @@ def on_read_without_connection(data) def on_read_with_connection(data) @data_callback.call(data, @callback_connection) rescue => e - @log.error "unexpected error on reading data", host: remote_host, port: remote_port, error: e + @log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e @log.error_backtrace close(true) rescue nil raise if @under_plugin_development diff --git a/lib/fluent/plugin_helper/socket_option.rb b/lib/fluent/plugin_helper/socket_option.rb index 82ad59ccf0..77fd9e3b66 100644 --- a/lib/fluent/plugin_helper/socket_option.rb +++ b/lib/fluent/plugin_helper/socket_option.rb @@ -24,7 +24,7 @@ module SocketOption FORMAT_STRUCT_LINGER = 'I!I!' # { int l_onoff; int l_linger; } FORMAT_STRUCT_TIMEVAL = 'L!L!' # { time_t tv_sec; suseconds_t tv_usec; } - def socket_option_validate!(protocol, resolve_name: nil, linger_timeout: nil, recv_timeout: nil, send_timeout: nil, certopts: nil) + def socket_option_validate!(protocol, resolve_name: nil, linger_timeout: nil, recv_timeout: nil, send_timeout: nil) unless resolve_name.nil? if protocol != :tcp && protocol != :udp && protocol != :tls raise ArgumentError, "BUG: resolve_name in available for tcp/udp/tls" @@ -35,23 +35,9 @@ def socket_option_validate!(protocol, resolve_name: nil, linger_timeout: nil, re raise ArgumentError, "BUG: linger_timeout is available for tcp/tls" end end - if certopts - if protocol != :tls - raise ArgumentError, "BUG: certopts is available only for tls" - end - else - if protocol == :tls - raise ArgumentError, "BUG: certopts (certificate options) not specified for TLS" - socket_option_certopts_validate!(certopts) - end - end - end - - def socket_option_certopts_validate!(certopts) - raise "not implemented yet" end - def socket_option_set(sock, resolve_name: nil, nonblock: false, linger_timeout: nil, recv_timeout: nil, send_timeout: nil, certopts: nil) + def socket_option_set(sock, resolve_name: nil, nonblock: false, linger_timeout: nil, recv_timeout: nil, send_timeout: nil) unless resolve_name.nil? sock.do_not_reverse_lookup = !resolve_name end @@ -70,7 +56,6 @@ def socket_option_set(sock, resolve_name: nil, nonblock: false, linger_timeout: optval = [send_timeout.to_i, 0].pack(FORMAT_STRUCT_TIMEVAL) socket_option_set_one(sock, :SO_SNDTIMEO, optval) end - # TODO: certopts for TLS sock end diff --git a/test/plugin_helper/test_server.rb b/test/plugin_helper/test_server.rb index ae906de967..b88d606f5a 100644 --- a/test/plugin_helper/test_server.rb +++ b/test/plugin_helper/test_server.rb @@ -141,11 +141,49 @@ class Dummy < Fluent::Plugin::TestBase assert created_server.is_a?(Coolio::TCPServer) end - # tests about "proto: :udp" is in #server_create + data(methods) + test 'creates tls server in default if transport section and tcp protocol specified' do |m| + @d = d = Dummy.new + transport_conf = config_element('transport', 'tcp', {}, []) + d.configure(config_element('ROOT', '', {}, [transport_conf])) + d.start + d.after_start + + d.__send__(m, :myserver, PORT){|x| x } + + created_server_info = @d._servers.first + assert_equal :tcp, created_server_info.proto + created_server = created_server_info.server + assert created_server.is_a?(Coolio::TCPServer) + end data(methods) test 'creates tls server if specified in proto' do |m| - # pend "not implemented yet" + assert_raise(ArgumentError.new("BUG: TLS transport specified, but certification options are not specified")) do + @d.__send__(m, :myserver, PORT, proto: :tls){|x| x } + end + @d.__send__(m, :myserver, PORT, proto: :tls, tls_options: {insecure: true}){|x| x } + + created_server_info = @d._servers.first + assert_equal :tls, created_server_info.proto + created_server = created_server_info.server + assert created_server.is_a?(Coolio::TCPServer) # yes, TCP here + end + + data(methods) + test 'creates tls server in default if transport section and tls protocol specified' do |m| + @d = d = Dummy.new + transport_conf = config_element('transport', 'tls', {'insecure' => 'true'}, []) + d.configure(config_element('ROOT', '', {}, [transport_conf])) + d.start + d.after_start + + d.__send__(m, :myserver, PORT){|x| x } + + created_server_info = @d._servers.first + assert_equal :tls, created_server_info.proto + created_server = created_server_info.server + assert created_server.is_a?(Coolio::TCPServer) # OK, it's Coolio::TCPServer end data(methods) @@ -162,10 +200,10 @@ class Dummy < Fluent::Plugin::TestBase data( 'server_create tcp' => [:server_create, :tcp], - # 'server_create tls' => [:server_create, :tls], + 'server_create tls' => [:server_create, :tls], # 'server_create unix' => [:server_create, :unix], 'server_create_connection tcp' => [:server_create_connection, :tcp], - # 'server_create_connection tcp' => [:server_create_connection, :tls], + 'server_create_connection tls' => [:server_create_connection, :tls], # 'server_create_connection tcp' => [:server_create_connection, :unix], ) test 'raise error if udp options specified for tcp/tls/unix' do |(m, proto)| @@ -203,17 +241,17 @@ class Dummy < Fluent::Plugin::TestBase # 'server_create_connection unix' => [:server_create_connection, :unix, {}], ) test 'raise error if tls options specified for tcp/udp/unix' do |(m, proto, kwargs)| - assert_raise(ArgumentError.new("BUG: certopts is available only for tls")) do - @d.__send__(m, :myserver, PORT, proto: proto, certopts: {}, **kwargs){|x| x } + assert_raise(ArgumentError.new("BUG: tls_options is available only for tls")) do + @d.__send__(m, :myserver, PORT, proto: proto, tls_options: {}, **kwargs){|x| x } end end data( 'server_create tcp' => [:server_create, :tcp, {}], 'server_create udp' => [:server_create, :udp, {max_bytes: 128}], - # 'server_create tls' => [:server_create, :tls, {}], + 'server_create tls' => [:server_create, :tls, {tls_options: {insecure: true}}], 'server_create_connection tcp' => [:server_create_connection, :tcp, {}], - # 'server_create_connection tls' => [:server_create_connection, :tls, {}], + 'server_create_connection tls' => [:server_create_connection, :tls, {tls_options: {insecure: true}}], ) test 'can bind specified IPv4 address' do |(m, proto, kwargs)| @d.__send__(m, :myserver, PORT, proto: proto, bind: "127.0.0.1", **kwargs){|x| x } @@ -224,9 +262,9 @@ class Dummy < Fluent::Plugin::TestBase data( 'server_create tcp' => [:server_create, :tcp, {}], 'server_create udp' => [:server_create, :udp, {max_bytes: 128}], - # 'server_create tls' => [:server_create, :tls, {}], + 'server_create tls' => [:server_create, :tls, {tls_options: {insecure: true}}], 'server_create_connection tcp' => [:server_create_connection, :tcp, {}], - # 'server_create_connection tls' => [:server_create_connection, :tls, {}], + 'server_create_connection tls' => [:server_create_connection, :tls, {tls_options: {insecure: true}}], ) test 'can bind specified IPv6 address' do |(m, proto, kwargs)| # if available omit "IPv6 unavailable here" unless ipv6_enabled? @@ -238,10 +276,10 @@ class Dummy < Fluent::Plugin::TestBase data( 'server_create tcp' => [:server_create, :tcp, {}], 'server_create udp' => [:server_create, :udp, {max_bytes: 128}], - # 'server_create tls' => [:server_create, :tls, {}], + 'server_create tls' => [:server_create, :tls, {tls_options: {insecure: true}}], # 'server_create unix' => [:server_create, :unix, {}], 'server_create_connection tcp' => [:server_create, :tcp, {}], - # 'server_create_connection tls' => [:server_create, :tls, {}], + 'server_create_connection tls' => [:server_create, :tls, {tls_options: {insecure: true}}], # 'server_create_connection unix' => [:server_create, :unix, {}], ) test 'can create 2 or more servers which share same bind address and port if shared option is true' do |(m, proto, kwargs)| @@ -260,10 +298,10 @@ class Dummy < Fluent::Plugin::TestBase data( 'server_create tcp' => [:server_create, :tcp, {}], 'server_create udp' => [:server_create, :udp, {max_bytes: 128}], - # 'server_create tls' => [:server_create, :tls, {}], + 'server_create tls' => [:server_create, :tls, {tls_options: {insecure: true}}], # 'server_create unix' => [:server_create, :unix, {}], 'server_create_connection tcp' => [:server_create, :tcp, {}], - # 'server_create_connection tls' => [:server_create, :tls, {}], + 'server_create_connection tls' => [:server_create, :tls, {tls_options: {insecure: true}}], # 'server_create_connection unix' => [:server_create, :unix, {}], ) test 'cannot create 2 or more servers using same bind address and port if shared option is false' do |(m, proto, kwargs)| @@ -286,7 +324,7 @@ class Dummy < Fluent::Plugin::TestBase data( 'tcp' => [:tcp, {}], 'udp' => [:udp, {max_bytes: 128}], - # 'tls' => [:tls, {}], + 'tls' => [:tls, {tls_options: {insecure: true}}], # 'unix' => [:unix, {}], ) test 'raise error if block argument is not specified or too many' do |(proto, kwargs)| @@ -696,25 +734,36 @@ class Dummy < Fluent::Plugin::TestBase end end - sub_test_case '#server_create_tls' do - # not implemented yet + def create_ca + end - # test 'can accept all keyword arguments valid for tcp/tls server' - # test 'creates a tls server just to read data' - # test 'creates a tls server to read and write data' - # test 'creates a tls server to read and write data using IPv6' + def create_ca_chained + end + + def create_server_pair_signed_by_ca + end + + def create_server_pair_signed_by_self + end + + sub_test_case '#server_create_tls' do + test 'can accept all keyword arguments valid for tcp/tls server' + test 'creates a tls server just to read data' + test 'creates a tls server to read and write data' + test 'creates a tls server to read and write data using IPv6' - # many tests about certops + # TODO: many tests about tls_options + # TODO: many tests about sections - # test 'does not resolve name of client address in default' - # test 'does resolve name of client address if resolve_name is true' + test 'does not resolve name of client address in default' + test 'does resolve name of client address if resolve_name is true' # test 'can keep connections alive for tls if keepalive specified' do # pend "not implemented yet" # end - # test 'raises error if plugin registers data callback for connection object from #server_create' - # test 'can call write_complete callback if registered' - # test 'can call close callback if registered' + test 'raises error if plugin registers data callback for connection object from #server_create' + test 'can call write_complete callback if registered' + test 'can call close callback if registered' end sub_test_case '#server_create_unix' do @@ -729,6 +778,22 @@ class Dummy < Fluent::Plugin::TestBase # test 'can call close callback if registered' end + def open_client(proto, addr, port) + client = case proto + when :tcp + TCPSocket.open(addr, port) + when :tls + c = OpenSSL::SSL::SSLSocket.new(TCPSocket.open(addr, port)) + c.sync_close = true + c.connect + else + raise ArgumentError, "unknown proto:#{proto}" + end + yield client + ensure + client.close rescue nil + end + # run tests for tcp, tls and unix sub_test_case '#server_create_connection' do test 'raise error if udp is specified in proto' do @@ -737,10 +802,10 @@ class Dummy < Fluent::Plugin::TestBase end end - # def server_create_connection(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, certopts: nil, resolve_name: false, linger_timeout: 0, backlog: nil, &block) + # def server_create_connection(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, tls_options: nil, resolve_name: false, linger_timeout: 0, backlog: nil, &block) protocols = { 'tcp' => [:tcp, {}], - # 'tls' => [:tls, {certopts: {}}], + 'tls' => [:tls, {tls_options: {insecure: true}}], # 'unix' => [:unix, {path: ""}], } @@ -766,7 +831,7 @@ class Dummy < Fluent::Plugin::TestBase end end 3.times do - TCPSocket.open("127.0.0.1", PORT) do |sock| + open_client(proto, "127.0.0.1", PORT) do |sock| sock.puts "yay" end end @@ -788,7 +853,7 @@ class Dummy < Fluent::Plugin::TestBase end end 3.times do - TCPSocket.open("127.0.0.1", PORT) do |sock| + open_client(proto, "127.0.0.1", PORT) do |sock| sock.puts "yay" end end @@ -816,20 +881,26 @@ class Dummy < Fluent::Plugin::TestBase end replied = [] disconnecteds = [] - 3.times do - TCPSocket.open("127.0.0.1", PORT) do |sock| + 3.times do |i| + open_client(proto, "127.0.0.1", PORT) do |sock| sock.puts "yay" while line = sock.readline replied << line break end sock.write "x" + connection_closed = false begin - sock.read + data = sock.read + if data.empty? + connection_closed = true + end rescue => e if e.is_a?(Errno::ECONNRESET) - disconnecteds << e.class + connection_closed = true end + ensure + disconnecteds << connection_closed end end end @@ -838,7 +909,7 @@ class Dummy < Fluent::Plugin::TestBase waiting(10){ sleep 0.1 until disconnecteds.size == 3 } assert_equal ["yay\n", "yay\n", "yay\n"], lines assert_equal ["foo!\n", "foo!\n", "foo!\n"], replied - assert_equal [Errno::ECONNRESET, Errno::ECONNRESET, Errno::ECONNRESET], disconnecteds + assert_equal [true, true, true], disconnecteds end data(protocols) @@ -860,7 +931,7 @@ class Dummy < Fluent::Plugin::TestBase end replied = [] 3.times do - TCPSocket.open("127.0.0.1", PORT) do |sock| + open_client(proto, "127.0.0.1", PORT) do |sock| sock.puts "yay" while line = sock.readline replied << line @@ -888,7 +959,7 @@ class Dummy < Fluent::Plugin::TestBase end end 3.times do - TCPSocket.open("127.0.0.1", PORT) do |sock| + open_client(proto, "127.0.0.1", PORT) do |sock| sock.puts "yay" end end @@ -901,7 +972,7 @@ class Dummy < Fluent::Plugin::TestBase test 'will refuse more connect requests after stop, but read data from sockets already connected, in non-shared server' do |(proto, kwargs)| connected = false begin - TCPSocket.open("127.0.0.1", PORT) do |sock| + open_client(proto, "127.0.0.1", PORT) do |sock| # expected behavior is connection refused... connected = true end @@ -919,7 +990,7 @@ class Dummy < Fluent::Plugin::TestBase end th0 = Thread.new do - TCPSocket.open("127.0.0.1", PORT) do |sock| + open_client(proto, "127.0.0.1", PORT) do |sock| sock.puts "yay" sock.readline end @@ -928,14 +999,12 @@ class Dummy < Fluent::Plugin::TestBase value0 = waiting(5){ th0.value } assert_equal "ack\n", value0 - # TODO: change how to create clients by proto - stopped = false sleeping = false ending = false th1 = Thread.new do - TCPSocket.open("127.0.0.1", PORT) do |sock| + open_client(proto, "127.0.0.1", PORT) do |sock| sleeping = true sleep 0.1 until stopped sock.puts "yay" @@ -958,7 +1027,7 @@ class Dummy < Fluent::Plugin::TestBase th2 = Thread.new do begin - TCPSocket.open("127.0.0.1", PORT) do |sock| + open_client(proto, "127.0.0.1", PORT) do |sock| sock.puts "foo" end false # failed From 6ca47e81273dee8b962fe125e3a79ca750c9d729 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Tue, 17 Jan 2017 19:05:27 +0900 Subject: [PATCH 03/17] fix to raise error for unspecified digest --- lib/fluent/plugin_helper/cert_option.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/fluent/plugin_helper/cert_option.rb b/lib/fluent/plugin_helper/cert_option.rb index e03cc27c0e..0004667d20 100644 --- a/lib/fluent/plugin_helper/cert_option.rb +++ b/lib/fluent/plugin_helper/cert_option.rb @@ -120,12 +120,14 @@ def cert_option_generate_server_pair_by_ca(ca_cert_path, ca_key_path, ca_key_pas ca_key = OpenSSL::PKey::RSA.new(File.read(ca_key_path), ca_key_passphrase) ca_cert = OpenSSL::X509::Certificate.new(File.read(ca_cert_path)) cert, key = cert_option_generate_server_pair(generate_opts, ca_cert.issuer) + raise "BUG: certificate digest algorithm not set" unless generate_opts[:digest] cert.sign(ca_key, generate_opts[:digest].to_s) return cert, key, nil end def cert_option_generate_server_pair_self_signed(generate_opts) cert, key = cert_option_generate_server_pair(generate_opts) + raise "BUG: certificate digest algorithm not set" unless generate_opts[:digest] cert.sign(key, generate_opts[:digest].to_s) return cert, key, nil end From 5a72ae1a0465f6b7b7003e5541619fb2d8699899 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Tue, 17 Jan 2017 19:06:38 +0900 Subject: [PATCH 04/17] fix to call on_readable callback right after TLS handshake establishment --- lib/fluent/plugin_helper/server.rb | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/lib/fluent/plugin_helper/server.rb b/lib/fluent/plugin_helper/server.rb index 5237955509..a2a9fae468 100644 --- a/lib/fluent/plugin_helper/server.rb +++ b/lib/fluent/plugin_helper/server.rb @@ -606,6 +606,7 @@ def initialize(sock, context, socket_option_setter, close_callback, log, under_p socket_option_setter.call(sock) @_handler_socket = OpenSSL::SSL::SSLSocket.new(sock, context) + @_handler_socket.sync_close = true @_handler_write_buffer = ''.force_encoding('ascii-8bit') @_handler_accepted = false super(@_handler_socket) @@ -648,7 +649,7 @@ def write(data) end def try_tls_accept - return if @_handler_accepted + return true if @_handler_accepted begin @_handler_socket.accept_nonblock # this method call actually try to do handshake via TLS @@ -659,12 +660,16 @@ def try_tls_accept unless @data_callback raise "connection callback must call #data to set data callback" end + return true + rescue IO::WaitReadable, IO::WaitWritable # retry accept_nonblock: there aren't enough data in underlying socket buffer rescue OpenSSL::SSL::SSLError => e @log.trace "unexpected error before accepting TLS connection", error: e close rescue nil end + + false end def on_connect @@ -672,11 +677,11 @@ def on_connect end def on_readable - if @_handler_accepted + if try_tls_accept super - else - try_tls_accept end + rescue IO::WaitReadable, IO::WaitWritable + # ignore and return with doing nothing end def on_writable From 00c2a2a2b05e25e11e15a547063a0aab63a99a13 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 18 Jan 2017 18:27:05 +0900 Subject: [PATCH 05/17] make internal APIs clean to generate server/ca certs on it --- lib/fluent/plugin_helper/cert_option.rb | 37 +++++++++++++++++-------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/lib/fluent/plugin_helper/cert_option.rb b/lib/fluent/plugin_helper/cert_option.rb index 0004667d20..6742142175 100644 --- a/lib/fluent/plugin_helper/cert_option.rb +++ b/lib/fluent/plugin_helper/cert_option.rb @@ -22,7 +22,7 @@ module Fluent module PluginHelper module CertOption def cert_option_create_context(version, insecure, ciphers, conf) - cert, key, extra_chain_certs = cert_option_server_validate!(conf) + cert, key, extra = cert_option_server_validate!(conf) ctx = OpenSSL::SSL::SSLContext.new(version) unless insecure @@ -35,8 +35,8 @@ def cert_option_create_context(version, insecure, ciphers, conf) ctx.cert = cert ctx.key = key - if extra_chain_certs - ctx.extra_chain_cert = extra_chain_certs + if extra && !extra.empty? + ctx.extra_chain_cert = extra end ctx @@ -56,7 +56,7 @@ def cert_option_server_validate!(conf) cert_option_generate_server_pair_by_ca( conf.ca_cert_path, conf.ca_private_key_path, - conf.private_key_passphrase, + conf.ca_private_key_passphrase, generate_opts ) @@ -74,8 +74,7 @@ def cert_option_load(cert_path, private_key_path, private_key_passphrase) key = OpenSSL::PKey::RSA.new(File.read(private_key_path), private_key_passphrase) certs = cert_option_certificates_from_file(cert_path) cert = certs.shift - extra_chain_certs = certs - return cert, key, extra_chain_certs + return cert, key, certs end def cert_option_cert_generation_opts_from_conf(conf) @@ -90,7 +89,7 @@ def cert_option_cert_generation_opts_from_conf(conf) } end - def cert_option_generate_server_pair(opts, issuer = nil) + def cert_option_generate_pair(opts, issuer = nil) key = OpenSSL::PKey::RSA.generate(opts[:private_key_length]) subject = OpenSSL::X509::Name.new @@ -109,25 +108,41 @@ def cert_option_generate_server_pair(opts, issuer = nil) cert.issuer = issuer cert.subject = subject + return cert, key + end + + def cert_option_generate_ca_pair_self_signed(generate_opts) + cert, key = cert_option_generate_pair(generate_opts) + # basicConstraints: this cert is for CA or not - cert.add_extension OpenSSL::X509::Extension.new('basicConstraints', OpenSSL::ASN1.Sequence([OpenSSL::ASN1::Boolean(false)])) - cert.add_extension OpenSSL::X509::Extension.new('nsCertType', 'server') + cert.add_extension OpenSSL::X509::Extension.new('basicConstraints', OpenSSL::ASN1.Sequence([OpenSSL::ASN1::Boolean(true)])) + cert.sign(key, generate_opts[:digest].to_s) return cert, key end def cert_option_generate_server_pair_by_ca(ca_cert_path, ca_key_path, ca_key_passphrase, generate_opts) ca_key = OpenSSL::PKey::RSA.new(File.read(ca_key_path), ca_key_passphrase) ca_cert = OpenSSL::X509::Certificate.new(File.read(ca_cert_path)) - cert, key = cert_option_generate_server_pair(generate_opts, ca_cert.issuer) + cert, key = cert_option_generate_pair(generate_opts, ca_cert.subject) raise "BUG: certificate digest algorithm not set" unless generate_opts[:digest] + + # basicConstraints: this cert is for CA or not + cert.add_extension OpenSSL::X509::Extension.new('basicConstraints', OpenSSL::ASN1.Sequence([OpenSSL::ASN1::Boolean(false)])) + cert.add_extension OpenSSL::X509::Extension.new('nsCertType', 'server') + cert.sign(ca_key, generate_opts[:digest].to_s) return cert, key, nil end def cert_option_generate_server_pair_self_signed(generate_opts) - cert, key = cert_option_generate_server_pair(generate_opts) + cert, key = cert_option_generate_pair(generate_opts) raise "BUG: certificate digest algorithm not set" unless generate_opts[:digest] + + # basicConstraints: this cert is for CA or not + cert.add_extension OpenSSL::X509::Extension.new('basicConstraints', OpenSSL::ASN1.Sequence([OpenSSL::ASN1::Boolean(false)])) + cert.add_extension OpenSSL::X509::Extension.new('nsCertType', 'server') + cert.sign(key, generate_opts[:digest].to_s) return cert, key, nil end From 30d00f7b33943e2edf93c7a960c7b81dc44b7e5a Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 18 Jan 2017 18:27:46 +0900 Subject: [PATCH 06/17] add tests about TLS transport --- test/plugin_helper/test_server.rb | 667 +++++++++++++++++++++++++++++- 1 file changed, 646 insertions(+), 21 deletions(-) diff --git a/test/plugin_helper/test_server.rb b/test/plugin_helper/test_server.rb index b88d606f5a..d78410040d 100644 --- a/test/plugin_helper/test_server.rb +++ b/test/plugin_helper/test_server.rb @@ -1,5 +1,6 @@ require_relative '../helper' require 'fluent/plugin_helper/server' +require 'fluent/plugin_helper/cert_option' # to create certs for tests require 'fluent/plugin/base' require 'timeout' @@ -11,6 +12,8 @@ class Dummy < Fluent::Plugin::TestBase helpers :server end + TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/../tmp/plugin_helper_server") + PORT = unused_port setup do @@ -734,36 +737,658 @@ class Dummy < Fluent::Plugin::TestBase end end - def create_ca + module CertUtil + extend Fluent::PluginHelper::CertOption + end + + def create_ca_options + { + private_key_length: 2048, + country: 'US', + state: 'CA', + locality: 'Mountain View', + common_name: 'ca.testing.fluentd.org', + expiration: 30 * 86400, + digest: :sha256, + } + end + + def create_server_options + { + private_key_length: 2048, + country: 'US', + state: 'CA', + locality: 'Mountain View', + common_name: 'server.testing.fluentd.org', + expiration: 30 * 86400, + digest: :sha256, + } + end + + def write_cert_and_key(cert_path, cert, key_path, key, passphrase) + File.open(cert_path, "w"){|f| f.write(cert.to_pem) } + # Encrypt secret key by AES256, and write it in PEM format + File.open(key_path, "w"){|f| f.write(key.export(OpenSSL::Cipher.new("AES-256-CBC"), passphrase)) } + File.chmod(0600, cert_path, key_path) end - def create_ca_chained + def create_server_pair_signed_by_self(cert_path, private_key_path, passphrase) + cert, key, _ = CertUtil.cert_option_generate_server_pair_self_signed(create_server_options) + write_cert_and_key(cert_path, cert, private_key_path, key, passphrase) end - def create_server_pair_signed_by_ca + def create_ca_pair_signed_by_self(cert_path, private_key_path, passphrase) + cert, key, _ = CertUtil.cert_option_generate_ca_pair_self_signed(create_ca_options) + write_cert_and_key(cert_path, cert, private_key_path, key, passphrase) + end + + def create_server_pair_signed_by_ca(ca_cert_path, ca_key_path, ca_key_passphrase, cert_path, private_key_path, passphrase) + cert, key, _ = CertUtil.cert_option_generate_server_pair_by_ca(ca_cert_path, ca_key_path, ca_key_passphrase, create_server_options) + write_cert_and_key(cert_path, cert, private_key_path, key, passphrase) + end + + def create_server_pair_chained_with_root_ca(ca_cert_path, ca_key_path, ca_key_passphrase, cert_path, private_key_path, passphrase) + root_cert, root_key, _ = CertUtil.cert_option_generate_ca_pair_self_signed(create_ca_options) + write_cert_and_key(ca_cert_path, root_cert, ca_key_path, root_key, ca_key_passphrase) + + intermediate_ca_options = create_ca_options + intermediate_ca_options[:common_name] = 'ca2.testing.fluentd.org' + chain_cert, chain_key = CertUtil.cert_option_generate_pair(intermediate_ca_options, root_cert.subject) + chain_cert.add_extension OpenSSL::X509::Extension.new('basicConstraints', OpenSSL::ASN1.Sequence([OpenSSL::ASN1::Boolean(true)])) + chain_cert.sign(root_key, "sha256") + + server_cert, server_key, _ = CertUtil.cert_option_generate_pair(create_server_options, chain_cert.subject) + server_cert.add_extension OpenSSL::X509::Extension.new('basicConstraints', OpenSSL::ASN1.Sequence([OpenSSL::ASN1::Boolean(false)])) + server_cert.add_extension OpenSSL::X509::Extension.new('nsCertType', 'server') + server_cert.sign(chain_key, "sha256") + + # write chained cert + File.open(cert_path, "w") do |f| + f.write server_cert.to_pem + f.write chain_cert.to_pem + end + File.open(private_key_path, "w"){|f| f.write(server_key.export(OpenSSL::Cipher.new("AES-256-CBC"), passphrase)) } + File.chmod(0600, cert_path, private_key_path) + end + + def open_tls_session(addr, port, verify: true, cert_path: nil, selfsigned: true, hostname: nil) + context = OpenSSL::SSL::SSLContext.new + context.set_params({}) + if verify + cert_store = OpenSSL::X509::Store.new + cert_store.set_default_paths + if selfsigned && OpenSSL::X509.const_defined?('V_FLAG_CHECK_SS_SIGNATURE') + cert_store.flags = OpenSSL::X509::V_FLAG_CHECK_SS_SIGNATURE + end + if cert_path + cert_store.add_file(cert_path) + end + context.verify_mode = OpenSSL::SSL::VERIFY_PEER + context.cert_store = cert_store + if !hostname && context.respond_to?(:verify_hostname=) + context.verify_hostname = false # In test code, using hostname to be connected is very difficult + end + else + context.verify_mode = OpenSSL::SSL::VERIFY_NONE + end + + sock = OpenSSL::SSL::SSLSocket.new(TCPSocket.new(addr, port), context) + sock.hostname = hostname if hostname && sock.respond_to?(:hostname) + sock.connect + yield sock + ensure + sock.close rescue nil end - def create_server_pair_signed_by_self + sub_test_case '#server_create_tls with various certificate options' do + setup do + @d = Dummy.new # to get plugin not configured/started yet + + @certs_dir = File.join(TMP_DIR, "tls_certs") + @server_cert_dir = File.join(@certs_dir, "server") + FileUtils.rm_rf @certs_dir + FileUtils.mkdir_p @server_cert_dir + end + + sub_test_case 'using tls_options arguments to specify cert options' do + setup do + @d.configure(config_element()); @d.start; @d.after_start + end + + test 'create dynamic self-signed cert/key pair (without any verification from clients)' do + # insecure + tls_options = { + protocol: :tls, + version: 'TLSv1_2', + ciphers: 'ALL:!aNULL:!eNULL:!SSLv2', + insecure: true, + generate_private_key_length: 2048, + generate_cert_country: 'US', + generate_cert_state: 'CA', + generate_cert_locality: 'Mountain View', + generate_cert_common_name: 'myserver.testing.fluentd.org', + generate_cert_expiration: 10 * 365 * 86400, + generate_cert_digest: :sha256, + } + + received = "" + @d.server_create_tls(:s, PORT, tls_options: tls_options) do |data, conn| + received << data + end + assert_raise "" do + open_tls_session('127.0.0.1', PORT) do |sock| + sock.post_connection_check('myserver.testing.fluentd.org') + # cannot connect .... + end + end + open_tls_session('127.0.0.1', PORT, verify: false) do |sock| + sock.puts "yay" + sock.puts "foo" + end + waiting(10){ sleep 0.1 until received.bytesize == 8 } + assert_equal "yay\nfoo\n", received + end + + test 'load self-signed cert/key pair (files), verified from clients using cert files' do + cert_path = File.join(@server_cert_dir, "cert.pem") + private_key_path = File.join(@certs_dir, "server.key.pem") + private_key_passphrase = "yaaaaaaaaaaaaaaaaaaay" + create_server_pair_signed_by_self(cert_path, private_key_path, private_key_passphrase) + + tls_options = { + protocol: :tls, + version: 'TLSv1_2', + ciphers: 'ALL:!aNULL:!eNULL:!SSLv2', + insecure: false, + cert_path: cert_path, + private_key_path: private_key_path, + private_key_passphrase: private_key_passphrase, + } + received = "" + @d.server_create_tls(:s, PORT, tls_options: tls_options) do |data, conn| + received << data + end + assert_raise "" do + open_tls_session('127.0.0.1', PORT) do |sock| + sock.post_connection_check('server.testing.fluentd.org') + # cannot connect by failing verification without server cert + end + end + open_tls_session('127.0.0.1', PORT, cert_path: cert_path) do |sock| + sock.puts "yay" + sock.puts "foo" + end + waiting(10){ sleep 0.1 until received.bytesize == 8 } + assert_equal "yay\nfoo\n", received + end + + test 'create dynamic server cert by private CA cert file, verified from clients using CA cert file' do + ca_cert_path = File.join(@certs_dir, "ca_cert.pem") + ca_key_path = File.join(@certs_dir, "ca.key.pem") + ca_key_passphrase = "fooooooooooooooooooooooooo" + create_ca_pair_signed_by_self(ca_cert_path, ca_key_path, ca_key_passphrase) + + tls_options = { + protocol: :tls, + version: 'TLSv1_2', + ciphers: 'ALL:!aNULL:!eNULL:!SSLv2', + insecure: false, + ca_cert_path: ca_cert_path, + ca_private_key_path: ca_key_path, + ca_private_key_passphrase: ca_key_passphrase, + generate_private_key_length: 2048, + } + received = "" + @d.server_create_tls(:s, PORT, tls_options: tls_options) do |data, conn| + received << data + end + open_tls_session('127.0.0.1', PORT, cert_path: ca_cert_path) do |sock| + sock.puts "yay" + sock.puts "foo" + end + waiting(10){ sleep 0.1 until received.bytesize == 8 } + assert_equal "yay\nfoo\n", received + end + + test 'load static server cert by private CA cert file, verified from clients using CA cert file' do + ca_cert_path = File.join(@certs_dir, "ca_cert.pem") + ca_key_path = File.join(@certs_dir, "ca.key.pem") + ca_key_passphrase = "foooooooo" + create_ca_pair_signed_by_self(ca_cert_path, ca_key_path, ca_key_passphrase) + + cert_path = File.join(@server_cert_dir, "cert.pem") + private_key_path = File.join(@certs_dir, "server.key.pem") + private_key_passphrase = "yaaaaaaaaaaaaaaaaaaay" + create_server_pair_signed_by_ca(ca_cert_path, ca_key_path, ca_key_passphrase, cert_path, private_key_path, private_key_passphrase) + + tls_options = { + protocol: :tls, + version: 'TLSv1_2', + ciphers: 'ALL:!aNULL:!eNULL:!SSLv2', + insecure: false, + cert_path: cert_path, + private_key_path: private_key_path, + private_key_passphrase: private_key_passphrase, + } + received = "" + @d.server_create_tls(:s, PORT, tls_options: tls_options) do |data, conn| + received << data + end + open_tls_session('127.0.0.1', PORT, cert_path: ca_cert_path) do |sock| + sock.puts "yay" + sock.puts "foo" + end + waiting(10){ sleep 0.1 until received.bytesize == 8 } + assert_equal "yay\nfoo\n", received + end + + test 'load chained server cert by private CA cert file, verified from clients using CA cert file as root' do + ca_cert_path = File.join(@certs_dir, "ca_cert.pem") + ca_key_path = File.join(@certs_dir, "ca.key.pem") + ca_key_passphrase = "foooooooo" + cert_path = File.join(@server_cert_dir, "cert.pem") + private_key_path = File.join(@certs_dir, "server.key.pem") + private_key_passphrase = "yaaaaaaaaaaaaaaaaaaay" + create_server_pair_chained_with_root_ca(ca_cert_path, ca_key_path, ca_key_passphrase, cert_path, private_key_path, private_key_passphrase) + + tls_options = { + protocol: :tls, + version: 'TLSv1_2', + ciphers: 'ALL:!aNULL:!eNULL:!SSLv2', + insecure: false, + cert_path: cert_path, + private_key_path: private_key_path, + private_key_passphrase: private_key_passphrase, + } + received = "" + @d.server_create_tls(:s, PORT, tls_options: tls_options) do |data, conn| + received << data + end + open_tls_session('127.0.0.1', PORT, cert_path: ca_cert_path) do |sock| + sock.puts "yay" + sock.puts "foo" + end + waiting(10){ sleep 0.1 until received.bytesize == 8 } + assert_equal "yay\nfoo\n", received + end + end + + sub_test_case 'using configurations to specify cert options' do + test 'create dynamic self-signed cert/key pair (without any verification from clients)' do + # insecure + transport_opts = { + 'insecure' => 'true', + } + transport_conf = config_element('transport', 'tls', transport_opts) + conf = config_element('match', 'tag.*', {}, [transport_conf]) + + @d.configure(conf); @d.start; @d.after_start + + received = "" + @d.server_create_tls(:s, PORT) do |data, conn| + received << data + end + assert_raise "" do + open_tls_session('127.0.0.1', PORT) do |sock| + sock.post_connection_check('myserver.testing.fluentd.org') + # cannot connect .... + end + end + open_tls_session('127.0.0.1', PORT, verify: false) do |sock| + sock.puts "yay" + sock.puts "foo" + end + waiting(10){ sleep 0.1 until received.bytesize == 8 } + assert_equal "yay\nfoo\n", received + end + + test 'load self-signed cert/key pair (files), verified from clients using cert files' do + cert_path = File.join(@server_cert_dir, "cert.pem") + private_key_path = File.join(@certs_dir, "server.key.pem") + private_key_passphrase = "yaaaaaaaaaaaaaaaaaaay" + create_server_pair_signed_by_self(cert_path, private_key_path, private_key_passphrase) + + transport_opts = { + 'cert_path' => cert_path, + 'private_key_path' => private_key_path, + 'private_key_passphrase' => private_key_passphrase, + } + transport_conf = config_element('transport', 'tls', transport_opts) + conf = config_element('match', 'tag.*', {}, [transport_conf]) + + @d.configure(conf); @d.start; @d.after_start + + received = "" + @d.server_create_tls(:s, PORT) do |data, conn| + received << data + end + assert_raise "" do + open_tls_session('127.0.0.1', PORT) do |sock| + sock.post_connection_check('server.testing.fluentd.org') + # cannot connect by failing verification without server cert + end + end + open_tls_session('127.0.0.1', PORT, cert_path: cert_path) do |sock| + sock.puts "yay" + sock.puts "foo" + end + waiting(10){ sleep 0.1 until received.bytesize == 8 } + assert_equal "yay\nfoo\n", received + end + + test 'create dynamic server cert by private CA cert file, verified from clients using CA cert file' do + ca_cert_path = File.join(@certs_dir, "ca_cert.pem") + ca_key_path = File.join(@certs_dir, "ca.key.pem") + ca_key_passphrase = "fooooooooooooooooooooooooo" + create_ca_pair_signed_by_self(ca_cert_path, ca_key_path, ca_key_passphrase) + + transport_opts = { + 'ca_cert_path' => ca_cert_path, + 'ca_private_key_path' => ca_key_path, + 'ca_private_key_passphrase' => ca_key_passphrase, + } + transport_conf = config_element('transport', 'tls', transport_opts) + conf = config_element('match', 'tag.*', {}, [transport_conf]) + + @d.configure(conf); @d.start; @d.after_start + + received = "" + @d.server_create_tls(:s, PORT) do |data, conn| + received << data + end + open_tls_session('127.0.0.1', PORT, cert_path: ca_cert_path) do |sock| + sock.puts "yay" + sock.puts "foo" + end + waiting(10){ sleep 0.1 until received.bytesize == 8 } + assert_equal "yay\nfoo\n", received + end + + test 'load static server cert by private CA cert file, verified from clients using CA cert file' do + ca_cert_path = File.join(@certs_dir, "ca_cert.pem") + ca_key_path = File.join(@certs_dir, "ca.key.pem") + ca_key_passphrase = "foooooooo" + create_ca_pair_signed_by_self(ca_cert_path, ca_key_path, ca_key_passphrase) + + cert_path = File.join(@server_cert_dir, "cert.pem") + private_key_path = File.join(@certs_dir, "server.key.pem") + private_key_passphrase = "yaaaaaaaaaaaaaaaaaaay" + create_server_pair_signed_by_ca(ca_cert_path, ca_key_path, ca_key_passphrase, cert_path, private_key_path, private_key_passphrase) + + transport_opts = { + 'cert_path' => cert_path, + 'private_key_path' => private_key_path, + 'private_key_passphrase' => private_key_passphrase, + } + transport_conf = config_element('transport', 'tls', transport_opts) + conf = config_element('match', 'tag.*', {}, [transport_conf]) + + @d.configure(conf); @d.start; @d.after_start + + received = "" + @d.server_create_tls(:s, PORT) do |data, conn| + received << data + end + open_tls_session('127.0.0.1', PORT, cert_path: ca_cert_path) do |sock| + sock.puts "yay" + sock.puts "foo" + end + waiting(10){ sleep 0.1 until received.bytesize == 8 } + assert_equal "yay\nfoo\n", received + end + + test 'load chained server cert by private CA cert file, verified from clients using CA cert file as root' do + ca_cert_path = File.join(@certs_dir, "ca_cert.pem") + ca_key_path = File.join(@certs_dir, "ca.key.pem") + ca_key_passphrase = "foooooooo" + cert_path = File.join(@server_cert_dir, "cert.pem") + private_key_path = File.join(@certs_dir, "server.key.pem") + private_key_passphrase = "yaaaaaaaaaaaaaaaaaaay" + create_server_pair_chained_with_root_ca(ca_cert_path, ca_key_path, ca_key_passphrase, cert_path, private_key_path, private_key_passphrase) + + transport_opts = { + 'cert_path' => cert_path, + 'private_key_path' => private_key_path, + 'private_key_passphrase' => private_key_passphrase, + } + transport_conf = config_element('transport', 'tls', transport_opts) + conf = config_element('match', 'tag.*', {}, [transport_conf]) + + @d.configure(conf); @d.start; @d.after_start + + received = "" + @d.server_create_tls(:s, PORT) do |data, conn| + received << data + end + open_tls_session('127.0.0.1', PORT, cert_path: ca_cert_path) do |sock| + sock.puts "yay" + sock.puts "foo" + end + waiting(10){ sleep 0.1 until received.bytesize == 8 } + assert_equal "yay\nfoo\n", received + end + end end sub_test_case '#server_create_tls' do - test 'can accept all keyword arguments valid for tcp/tls server' - test 'creates a tls server just to read data' - test 'creates a tls server to read and write data' - test 'creates a tls server to read and write data using IPv6' - - # TODO: many tests about tls_options - # TODO: many tests about sections - - test 'does not resolve name of client address in default' - test 'does resolve name of client address if resolve_name is true' - # test 'can keep connections alive for tls if keepalive specified' do - # pend "not implemented yet" - # end - - test 'raises error if plugin registers data callback for connection object from #server_create' - test 'can call write_complete callback if registered' - test 'can call close callback if registered' + setup do + @certs_dir = File.join(TMP_DIR, "tls_certs") + FileUtils.rm_rf @certs_dir + FileUtils.mkdir_p @certs_dir + + @server_cert_dir = File.join(@certs_dir, "server") + FileUtils.mkdir_p @server_cert_dir + + @cert_path = File.join(@server_cert_dir, "cert.pem") + private_key_path = File.join(@certs_dir, "server.key.pem") + private_key_passphrase = "yaaaaaaaaaaaaaaaaaaay" + create_server_pair_signed_by_self(@cert_path, private_key_path, private_key_passphrase) + + @default_hostname = ::Socket.gethostname + + @tls_options = { + protocol: :tls, + version: 'TLSv1_2', + ciphers: 'ALL:!aNULL:!eNULL:!SSLv2', + insecure: false, + cert_path: @cert_path, + private_key_path: private_key_path, + private_key_passphrase: private_key_passphrase, + } + end + + test 'can accept all keyword arguments valid for tcp/tls server' do + assert_nothing_raised do + @d.server_create_tls(:s, PORT, bind: '127.0.0.1', shared: false, resolve_name: true, linger_timeout: 10, backlog: 500, tls_options: @tls_options) do |data, conn| + # ... + end + end + end + + test 'creates a tls server just to read data' do + received = "" + @d.server_create_tls(:s, PORT, tls_options: @tls_options) do |data, conn| + received << data + end + 3.times do + open_tls_session('127.0.0.1', PORT, cert_path: @cert_path) do |sock| + sock.puts "yay" + sock.puts "foo" + end + end + waiting(10){ sleep 0.1 until received.bytesize == 24 } + assert_equal "yay\nfoo\nyay\nfoo\nyay\nfoo\n", received + end + + test 'creates a tls server to read and write data' do + received = "" + responses = [] + @d.server_create_tls(:s, PORT, tls_options: @tls_options) do |data, conn| + received << data + conn.write "ack\n" + end + 3.times do + # open_tls_session('127.0.0.1', PORT, cert_path: @cert_path, hostname: @default_hostname) do |sock| + open_tls_session('127.0.0.1', PORT, cert_path: @cert_path) do |sock| + sock.puts "yay" + sock.puts "foo" + responses << sock.readline + end + end + waiting(10){ sleep 0.1 until received.bytesize == 24 } + assert_equal "yay\nfoo\nyay\nfoo\nyay\nfoo\n", received + assert_equal ["ack\n","ack\n","ack\n"], responses + end + + test 'creates a tls server to read and write data using IPv6' do + omit "IPv6 unavailable here" unless ipv6_enabled? + + received = "" + responses = [] + @d.server_create_tls(:s, PORT, bind: "::1", tls_options: @tls_options) do |data, conn| + received << data + conn.write "ack\n" + end + 3.times do + # open_tls_session('::1', PORT, cert_path: @cert_path, hostname: @default_hostname) do |sock| + open_tls_session('::1', PORT, cert_path: @cert_path) do |sock| + sock.puts "yay" + sock.puts "foo" + responses << sock.readline + end + end + waiting(10){ sleep 0.1 until received.bytesize == 24 } + assert_equal "yay\nfoo\nyay\nfoo\nyay\nfoo\n", received + assert_equal ["ack\n","ack\n","ack\n"], responses + end + + test 'does not resolve name of client address in default' do + received = "" + sources = [] + @d.server_create_tls(:s, PORT, tls_options: @tls_options) do |data, conn| + received << data + sources << conn.remote_host + end + 3.times do + # open_tls_session('127.0.0.1', PORT, cert_path: @cert_path, hostname: @default_hostname) do |sock| + open_tls_session('127.0.0.1', PORT, cert_path: @cert_path) do |sock| + sock.puts "yay" + end + end + waiting(10){ sleep 0.1 until received.bytesize == 12 } + assert_equal "yay\nyay\nyay\n", received + assert{ sources.all?{|s| s == "127.0.0.1" } } + end + + test 'does resolve name of client address if resolve_name is true' do + hostname = Socket.getnameinfo([nil, nil, nil, "127.0.0.1"])[0] + + received = "" + sources = [] + @d.server_create_tls(:s, PORT, resolve_name: true, tls_options: @tls_options) do |data, conn| + received << data + sources << conn.remote_host + end + 3.times do + # open_tls_session('127.0.0.1', PORT, cert_path: @cert_path, hostname: @default_hostname) do |sock| + open_tls_session('127.0.0.1', PORT, cert_path: @cert_path) do |sock| + sock.puts "yay" + end + end + waiting(10){ sleep 0.1 until received.bytesize == 12 } + assert_equal "yay\nyay\nyay\n", received + assert{ sources.all?{|s| s == hostname } } + end + + test 'can keep connections alive for tls if keepalive specified' do + # pend "not implemented yet" + end + + test 'raises error if plugin registers data callback for connection object from #server_create' do + received = "" + errors = [] + @d.server_create_tls(:s, PORT, tls_options: @tls_options) do |data, conn| + received << data + begin + conn.data{|d| received << d.upcase } + rescue => e + errors << e + end + end + # open_tls_session('127.0.0.1', PORT, cert_path: @cert_path, hostname: @default_hostname) do |sock| + open_tls_session('127.0.0.1', PORT, cert_path: @cert_path) do |sock| + sock.puts "foo" + end + waiting(10){ sleep 0.1 until received.bytesize == 4 || errors.size == 1 } + assert_equal "foo\n", received + assert_equal 1, errors.size + assert_equal "data callback can be registered just once, but registered twice", errors.first.message + end + + test 'can call write_complete callback if registered' do + buffer = "" + lines = [] + responses = [] + response_completes = [] + @d.server_create_tls(:s, PORT, tls_options: @tls_options) do |data, conn| + conn.on(:write_complete){|c| response_completes << true } + buffer << data + if idx = buffer.index("\n") + lines << buffer.slice!(0,idx+1) + conn.write "ack\n" + end + end + 3.times do + # open_tls_session('127.0.0.1', PORT, cert_path: @cert_path, hostname: @default_hostname) do |sock| + open_tls_session('127.0.0.1', PORT, cert_path: @cert_path) do |sock| + sock.write "yay" + sock.write "foo\n" + begin + responses << sock.readline + rescue EOFError, IOError, Errno::ECONNRESET + # ignore + end + sock.close + end + end + waiting(10){ sleep 0.1 until lines.size == 3 && response_completes.size == 3 } + assert_equal ["yayfoo\n", "yayfoo\n", "yayfoo\n"], lines + assert_equal ["ack\n","ack\n","ack\n"], responses + assert_equal [true, true, true], response_completes + end + + test 'can call close callback if registered' do + buffer = "" + lines = [] + callback_results = [] + @d.server_create_tls(:s, PORT, tls_options: @tls_options) do |data, conn| + conn.on(:close){|c| callback_results << "closed" } + buffer << data + if idx = buffer.index("\n") + lines << buffer.slice!(0,idx+1) + conn.write "ack\n" + end + end + 3.times do + # open_tls_session('127.0.0.1', PORT, cert_path: @cert_path, hostname: @default_hostname) do |sock| + open_tls_session('127.0.0.1', PORT, cert_path: @cert_path) do |sock| + sock.write "yay" + sock.write "foo\n" + begin + while line = sock.readline + if line == "ack\n" + sock.close + end + end + rescue EOFError, IOError, Errno::ECONNRESET + # ignore + end + end + end + waiting(10){ sleep 0.1 until lines.size == 3 && callback_results.size == 3 } + assert_equal ["yayfoo\n", "yayfoo\n", "yayfoo\n"], lines + assert_equal ["closed", "closed", "closed"], callback_results + end end sub_test_case '#server_create_unix' do From 43d16560bb4824e01f0a1d1f224be0cf325db1fe Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 18 Jan 2017 18:28:23 +0900 Subject: [PATCH 07/17] add TLS support --- lib/fluent/plugin_helper/socket.rb | 144 ++++++++++++++++++++++++++++- 1 file changed, 140 insertions(+), 4 deletions(-) diff --git a/lib/fluent/plugin_helper/socket.rb b/lib/fluent/plugin_helper/socket.rb index 87a5aaab72..42fa57efce 100644 --- a/lib/fluent/plugin_helper/socket.rb +++ b/lib/fluent/plugin_helper/socket.rb @@ -16,6 +16,7 @@ require 'socket' require 'ipaddr' +require 'openssl' require_relative 'socket_option' @@ -29,6 +30,12 @@ module Socket include Fluent::PluginHelper::SocketOption + TLS_DEFAULT_VERSION = :'TLSv1_2' + TLS_SUPPORTED_VERSIONS = [:'TLSv1_1', :'TLSv1_2'] + ### follow httpclient configuration by nahi + # OpenSSL 0.9.8 default: "ALL:!ADH:!LOW:!EXP:!MD5:+SSLv2:@STRENGTH" + CIPHERS_DEFAULT = "ALL:!aNULL:!eNULL:!SSLv2" # OpenSSL >1.0.0 default + attr_reader :_sockets # for tests # TODO: implement connection pool for specified host @@ -49,7 +56,7 @@ def socket_create(proto, host, port, **kwargs, &block) end def socket_create_tcp(host, port, resolve_name: false, **kwargs, &block) - sock = TCPSocket.new(host, port) + sock = WrappedSocket::TCP.new(host, port) socket_option_set(sock, resolve_name: resolve_name, **kwargs) if block begin @@ -65,7 +72,7 @@ def socket_create_tcp(host, port, resolve_name: false, **kwargs, &block) def socket_create_udp(host, port, resolve_name: false, connect: false, **kwargs, &block) family = IPAddr.new(IPSocket.getaddress(host)).ipv4? ? ::Socket::AF_INET : ::Socket::AF_INET6 - sock = UDPSocket.new(family) + sock = WrappedSocket::UDP.new(family) socket_option_set(sock, resolve_name: resolve_name, **kwargs) sock.connect(host, port) if connect if block @@ -79,12 +86,141 @@ def socket_create_udp(host, port, resolve_name: false, connect: false, **kwargs, end end - def socket_create_tls(host, port, resolve_name: false, certopts: {}, &block) - raise "not implemented yet" + def socket_create_tls( + host, port, + version: TLS_DEFAULT_VERSION, ciphers: CIPHERS_DEFAULT, insecure: false, verify_fqdn: true, fqdn: nil, + enable_system_cert_store: true, allow_self_signed_cert: false, cert_path: nil, **kwargs, &block) + + host_is_ipaddress = IPAddr.new(host) rescue false + fqdn ||= host unless host_is_ipaddress + + context = OpenSSL::SSL::SSLContext.new(version) + + if insecure + log.trace "setting TLS verify_mode NONE" + context.verify_mode = OpenSSL::SSL::VERIFY_NONE + else + cert_store = OpenSSL::X509::Store.new + if allow_self_signed_cert && OpenSSL::X509.const_defined?('V_FLAG_CHECK_SS_SIGNATURE') + cert_store.flags = OpenSSL::X509::V_FLAG_CHECK_SS_SIGNATURE + end + begin + if enable_system_cert_store + log.trace "loading system default certificate store" + cert_store.set_default_paths + end + rescue OpenSSL::X509::StoreError + log.warn "failed to load system default certificate store", error: e + end + if ca_cert_path + log.trace "adding CA cert", path: ca_cert_path + cert_store.add_file(ca_cert_path) + end + if server_cert_dir + log.trace "adding server cert directory", path: server_cert_dir + cert_store.add_path(server_cert_dir) + end + + log.trace "setting TLS context", mode: "peer", ciphers: ciphers + context.set_params({}) + context.ciphers = ciphers + context.verify_mode = OpenSSL::SSL::VERIFY_PEER + context.cert_store = cert_store + context.verify_hostname = true if verify_fqdn && fqdn && context.respond_to?(:verify_hostname=) + end + + tcpsock = socket_create_tcp(host, port, **kwargs) + sock = WrappedSocket::TLS.new(tcpsock, context) + sock.sync_close = true + sock.hostname = fqdn if verify_fqdn && fqdn && sock.respond_to?(:hostname=) + + log.trace "entering TLS handshake" + sock.connect + + begin + if verify_fqdn + log.trace "checking peer's certificate", subject: sock.peer_cert.subject + sock.post_connection_check(fqdn) + verify = sock.verify_result + if verify != OpenSSL::X509::V_OK + err_name = Socket.tls_verify_result_name(verify) + log.warn "BUG: failed to verify certification while connecting (but not raised, why?)", host: host, fqdn: fqdn, error: err_name + raise RuntimeError, "BUG: failed to verify certification and to handle it correctly while connecting host #{host} as #{fqdn}" + end + end + rescue OpenSSL::SSL::SSLError => e + log.warn "failed to verify certification while connecting tls session", host: host, fqdn: fqdn, error: e + raise + end + + if block + begin + block.call(sock) + ensure + sock.close rescue nil + end + else + sock + end + end + + def self.tls_verify_result_name(code) + case code + when OpenSSL::X509::V_OK then 'V_OK' + when OpenSSL::X509::V_ERR_AKID_SKID_MISMATCH then 'V_ERR_AKID_SKID_MISMATCH' + when OpenSSL::X509::V_ERR_APPLICATION_VERIFICATION then 'V_ERR_APPLICATION_VERIFICATION' + when OpenSSL::X509::V_ERR_CERT_CHAIN_TOO_LONG then 'V_ERR_CERT_CHAIN_TOO_LONG' + when OpenSSL::X509::V_ERR_CERT_HAS_EXPIRED then 'V_ERR_CERT_HAS_EXPIRED' + when OpenSSL::X509::V_ERR_CERT_NOT_YET_VALID then 'V_ERR_CERT_NOT_YET_VALID' + when OpenSSL::X509::V_ERR_CERT_REJECTED then 'V_ERR_CERT_REJECTED' + when OpenSSL::X509::V_ERR_CERT_REVOKED then 'V_ERR_CERT_REVOKED' + when OpenSSL::X509::V_ERR_CERT_SIGNATURE_FAILURE then 'V_ERR_CERT_SIGNATURE_FAILURE' + when OpenSSL::X509::V_ERR_CERT_UNTRUSTED then 'V_ERR_CERT_UNTRUSTED' + when OpenSSL::X509::V_ERR_CRL_HAS_EXPIRED then 'V_ERR_CRL_HAS_EXPIRED' + when OpenSSL::X509::V_ERR_CRL_NOT_YET_VALID then 'V_ERR_CRL_NOT_YET_VALID' + when OpenSSL::X509::V_ERR_CRL_SIGNATURE_FAILURE then 'V_ERR_CRL_SIGNATURE_FAILURE' + when OpenSSL::X509::V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT then 'V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT' + when OpenSSL::X509::V_ERR_ERROR_IN_CERT_NOT_AFTER_FIELD then 'V_ERR_ERROR_IN_CERT_NOT_AFTER_FIELD' + when OpenSSL::X509::V_ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD then 'V_ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD' + when OpenSSL::X509::V_ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD then 'V_ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD' + when OpenSSL::X509::V_ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD then 'V_ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD' + when OpenSSL::X509::V_ERR_INVALID_CA then 'V_ERR_INVALID_CA' + when OpenSSL::X509::V_ERR_INVALID_PURPOSE then 'V_ERR_INVALID_PURPOSE' + when OpenSSL::X509::V_ERR_KEYUSAGE_NO_CERTSIGN then 'V_ERR_KEYUSAGE_NO_CERTSIGN' + when OpenSSL::X509::V_ERR_OUT_OF_MEM then 'V_ERR_OUT_OF_MEM' + when OpenSSL::X509::V_ERR_PATH_LENGTH_EXCEEDED then 'V_ERR_PATH_LENGTH_EXCEEDED' + when OpenSSL::X509::V_ERR_SELF_SIGNED_CERT_IN_CHAIN then 'V_ERR_SELF_SIGNED_CERT_IN_CHAIN' + when OpenSSL::X509::V_ERR_SUBJECT_ISSUER_MISMATCH then 'V_ERR_SUBJECT_ISSUER_MISMATCH' + when OpenSSL::X509::V_ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY then 'V_ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY' + when OpenSSL::X509::V_ERR_UNABLE_TO_DECRYPT_CERT_SIGNATURE then 'V_ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY' + when OpenSSL::X509::V_ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE then 'V_ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE' + when OpenSSL::X509::V_ERR_UNABLE_TO_GET_CRL then 'V_ERR_UNABLE_TO_GET_CRL' + when OpenSSL::X509::V_ERR_UNABLE_TO_GET_ISSUER_CERT then 'V_ERR_UNABLE_TO_GET_ISSUER_CERT' + when OpenSSL::X509::V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY then 'V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY' + when OpenSSL::X509::V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE then 'V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE' + end end # socket_create_socks ? + module WrappedSocket + class TCP < ::TCPSocket + def remote_addr; peeraddr[3]; end + def remote_host; peeraddr[2]; end + def remote_port; peeraddr[1]; end + end + class UDP < ::UDPSocket + def remote_addr; peeraddr[3]; end + def remote_host; peeraddr[2]; end + def remote_port; peeraddr[1]; end + end + class TLS < OpenSSL::SSL::SSLSocket + def remote_addr; peeraddr[3]; end + def remote_host; peeraddr[2]; end + def remote_port; peeraddr[1]; end + end + end + def initialize super # @_sockets = [] # for keepalived sockets / connection pool From 876b295eb83c3997517c22c55b3deeab087461ad Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 18 Jan 2017 19:46:49 +0900 Subject: [PATCH 08/17] add TODO for cheaper cost TLS --- lib/fluent/plugin_helper/server.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/fluent/plugin_helper/server.rb b/lib/fluent/plugin_helper/server.rb index a2a9fae468..ffdb57dab1 100644 --- a/lib/fluent/plugin_helper/server.rb +++ b/lib/fluent/plugin_helper/server.rb @@ -35,6 +35,7 @@ module Server # This plugin helper doesn't support these things for now: # * TCP/TLS keepalive + # * TLS session cache/tickets # * unix domain sockets # stop : [-] From 3198105403f00160ab6466a121422880c444ecde Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 18 Jan 2017 19:47:19 +0900 Subject: [PATCH 09/17] make it possible to read 2 or more CA certs --- lib/fluent/plugin_helper/socket.rb | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/lib/fluent/plugin_helper/socket.rb b/lib/fluent/plugin_helper/socket.rb index 42fa57efce..b751fe7b85 100644 --- a/lib/fluent/plugin_helper/socket.rb +++ b/lib/fluent/plugin_helper/socket.rb @@ -89,7 +89,7 @@ def socket_create_udp(host, port, resolve_name: false, connect: false, **kwargs, def socket_create_tls( host, port, version: TLS_DEFAULT_VERSION, ciphers: CIPHERS_DEFAULT, insecure: false, verify_fqdn: true, fqdn: nil, - enable_system_cert_store: true, allow_self_signed_cert: false, cert_path: nil, **kwargs, &block) + enable_system_cert_store: true, allow_self_signed_cert: false, cert_paths: nil, **kwargs, &block) host_is_ipaddress = IPAddr.new(host) rescue false fqdn ||= host unless host_is_ipaddress @@ -112,13 +112,17 @@ def socket_create_tls( rescue OpenSSL::X509::StoreError log.warn "failed to load system default certificate store", error: e end - if ca_cert_path - log.trace "adding CA cert", path: ca_cert_path - cert_store.add_file(ca_cert_path) - end - if server_cert_dir - log.trace "adding server cert directory", path: server_cert_dir - cert_store.add_path(server_cert_dir) + if cert_paths + if cert_paths.respond_to?(:each) + cert_paths.each do |cert_path| + log.trace "adding CA cert", path: cert_path + cert_store.add_file(cert_path) + end + else + cert_path = cert_paths + log.trace "adding CA cert", path: cert_path + cert_store.add_file(cert_path) + end end log.trace "setting TLS context", mode: "peer", ciphers: ciphers From 8cda02102456727ffc579b6e2cfa9bf7db1ed4c7 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 18 Jan 2017 19:47:47 +0900 Subject: [PATCH 10/17] support TLS in in/out_forward and in_tcp --- lib/fluent/plugin/in_forward.rb | 2 +- lib/fluent/plugin/in_tcp.rb | 2 +- lib/fluent/plugin/out_forward.rb | 91 ++++++++++++++++++++++++++------ 3 files changed, 78 insertions(+), 17 deletions(-) diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb index b6786608c7..070d6e480f 100644 --- a/lib/fluent/plugin/in_forward.rb +++ b/lib/fluent/plugin/in_forward.rb @@ -154,7 +154,7 @@ def start shared_socket = system_config.workers > 1 - log.info "listening a tcp port", port: @port, bind: @bind + log.info "listening port", port: @port, bind: @bind server_create_connection( :in_forward_server, @port, bind: @bind, diff --git a/lib/fluent/plugin/in_tcp.rb b/lib/fluent/plugin/in_tcp.rb index 374ae3b6cc..428eb5258a 100644 --- a/lib/fluent/plugin/in_tcp.rb +++ b/lib/fluent/plugin/in_tcp.rb @@ -56,7 +56,7 @@ def start super @buffer = '' - server_create(:in_tcp_server, @port, proto: :tcp, bind: @bind) do |data, conn| + server_create(:in_tcp_server, @port, bind: @bind) do |data, conn| @buffer << data begin pos = 0 diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index 848c8f2cc2..4f8bc682a3 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -33,12 +33,17 @@ class ConnectionClosedError < Error; end LISTEN_PORT = 24224 + desc 'The transport protocol.' + config_param :transport, :enum, list: [:tcp, :tls], default: :tcp + # TODO: TLS session cache/tickets + # TODO: Connection keepalive + desc 'The timeout time when sending event logs.' config_param :send_timeout, :time, default: 60 # TODO: add linger_timeout, recv_timeout - desc 'The transport protocol to use for heartbeats.(udp,tcp,none)' - config_param :heartbeat_type, :enum, list: [:tcp, :udp, :none], default: :tcp + desc 'The protocol to use for heartbeats (default is the same with "transport").' + config_param :heartbeat_type, :enum, list: [:transport, :udp, :none], default: :transport desc 'The interval of the heartbeat packer.' config_param :heartbeat_interval, :time, default: 1 desc 'The wait time before accepting a server fault recovery.' @@ -75,6 +80,19 @@ class ConnectionClosedError < Error; end desc 'Compress buffered data.' config_param :compress, :enum, list: [:text, :gzip], default: :text + desc 'The default version of TLS transport.' + config_param :tls_version, :enum, list: Fluent::PluginHelper::Socket::TLS_SUPPORTED_VERSIONS, default: Fluent::PluginHelper::Socket::TLS_DEFAULT_VERSION + desc 'The cipher configuration of TLS transport.' + config_param :tls_ciphers, :string, default: Fluent::PluginHelper::Socket::CIPHERS_DEFAULT + desc 'Skip all verification of certificates or not.' + config_param :tls_insecure_mode, :bool, default: false + desc 'Allow self signed certificates or not.' + config_param :tls_allow_self_signed_cert, :bool, default: false + desc 'Verify hostname of servers and certificates or not in TLS transport.' + config_param :tls_verify_hostname, :bool, default: true + desc 'The additional CA certificate path for TLS.' + config_param :tls_cert_path, :array, value_type: :string, default: nil + config_section :security, required: false, multi: false do desc 'The hostname' config_param :self_hostname, :string @@ -85,7 +103,7 @@ class ConnectionClosedError < Error; end config_section :server, param_name: :servers do desc "The IP address or host name of the server." config_param :host, :string - desc "The name of the server. Used in log messages." + desc "The name of the server. Used for logging and certificate verification in TLS transport (when host is address)." config_param :name, :string, default: nil desc "The port number of the host." config_param :port, :integer, default: LISTEN_PORT @@ -138,7 +156,22 @@ def configure(conf) if @dns_round_robin if @heartbeat_type == :udp - raise Fluent::ConfigError, "forward output heartbeat type must be 'tcp' or 'none' to use dns_round_robin option" + raise Fluent::ConfigError, "forward output heartbeat type must be 'transport' or 'none' to use dns_round_robin option" + end + end + + if @transport == :tls + if @tls_cert_path && !@tls_cert_path.empty? + @tls_cert_path.each do |path| + raise Fluent::ConfigError, "specified cert path does not exist:#{path}" unless File.exist?(path) + raise Fluent::ConfigError, "specified cert path is not readable:#{path}" unless File.exist?(path) + end + end + + if @tls_insecure_mode + log.warn "TLS transport is configured in insecure way" + @tls_verify_hostname = false + @tls_allow_self_signed_cert = true end end @@ -275,14 +308,34 @@ def select_a_healthy_node raise NoNodesAvailable, "no nodes are available" end - def create_transfer_socket(host, port, &block) - socket_create_tcp( - host, port, - linger_timeout: @send_timeout, - send_timeout: @send_timeout, - recv_timeout: @ack_response_timeout, - &block - ) + def create_transfer_socket(host, port, hostname, &block) + case @transport + when :tls + socket_create_tls( + host, port, + version: @tls_version, + ciphers: @tls_ciphers, + insecure: @tls_insecure_mode, + verify_fqdn: @tls_verify_hostname, + fqdn: hostname, + allow_self_signed_cert: @tls_allow_self_signed_cert, + cert_paths: @tls_cert_path, + linger_timeout: @send_timeout, + send_timeout: @send_timeout, + recv_timeout: @ack_response_timeout, + &block + ) + when :tcp + socket_create_tcp( + host, port, + linger_timeout: @send_timeout, + send_timeout: @send_timeout, + recv_timeout: @ack_response_timeout, + &block + ) + else + raise "BUG: unknown transport protocol #{@transport}" + end end # MessagePack FixArray length is 3 @@ -458,6 +511,14 @@ def initialize(sender, server, failure:) @available = true @state = nil + # @hostname is used for certificate verification & TLS SNI + host_is_hostname = !(IPAddr.new(@host) rescue false) + @hostname = case + when host_is_hostname then @host + when @name then @name + else nil + end + @usock = nil @username = server.username @@ -557,7 +618,7 @@ def send_data_actual(sock, tag, chunk) end def send_data(tag, chunk) - sock = @sender.create_transfer_socket(resolved_host, port) + sock = @sender.create_transfer_socket(resolved_host, port, @hostname) begin send_data_actual(sock, tag, chunk) rescue @@ -589,8 +650,8 @@ def send_heartbeat end case @sender.heartbeat_type - when :tcp - @sender.create_transfer_socket(dest_addr, port) do |sock| + when :transport + @sender.create_transfer_socket(dest_addr, port, @hostname) do |sock| ## don't send any data to not cause a compatibility problem # sock.write FORWARD_TCP_HEARTBEAT_DATA From 055ec94b85abdfe6b52bf2c466f674331b48c0eb Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 18 Jan 2017 19:48:03 +0900 Subject: [PATCH 11/17] add example configuration for TLS enabled --- example/in_forward_tls.conf | 14 ++++++++++++++ example/out_forward_tls.conf | 18 ++++++++++++++++++ 2 files changed, 32 insertions(+) create mode 100644 example/in_forward_tls.conf create mode 100644 example/out_forward_tls.conf diff --git a/example/in_forward_tls.conf b/example/in_forward_tls.conf new file mode 100644 index 0000000000..d6c9612b80 --- /dev/null +++ b/example/in_forward_tls.conf @@ -0,0 +1,14 @@ + + @type forward + port 24224 + + insecure true + + + + + @type stdout + # + # flush_interval 10s + # + diff --git a/example/out_forward_tls.conf b/example/out_forward_tls.conf new file mode 100644 index 0000000000..d6dfb8a719 --- /dev/null +++ b/example/out_forward_tls.conf @@ -0,0 +1,18 @@ + + @type dummy + tag test + + + + @type forward + transport tls + tls_insecure_mode true + + # first server + host 127.0.0.1 + port 24224 + + + flush_interval 0 + + From 6aabce534bc26ceb5abe28cd66a3065be7c35fd1 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Thu, 19 Jan 2017 11:18:58 +0900 Subject: [PATCH 12/17] fix not to be affected by thread scheduling --- test/plugin_helper/test_server.rb | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/test/plugin_helper/test_server.rb b/test/plugin_helper/test_server.rb index d78410040d..81cd4cc136 100644 --- a/test/plugin_helper/test_server.rb +++ b/test/plugin_helper/test_server.rb @@ -1217,7 +1217,8 @@ def open_tls_session(addr, port, verify: true, cert_path: nil, selfsigned: true, end end waiting(10){ sleep 0.1 until received.bytesize == 24 } - assert_equal "yay\nfoo\nyay\nfoo\nyay\nfoo\n", received + assert_equal 3, received.scan("yay\n").size + assert_equal 3, received.scan("foo\n").size end test 'creates a tls server to read and write data' do @@ -1236,7 +1237,8 @@ def open_tls_session(addr, port, verify: true, cert_path: nil, selfsigned: true, end end waiting(10){ sleep 0.1 until received.bytesize == 24 } - assert_equal "yay\nfoo\nyay\nfoo\nyay\nfoo\n", received + assert_equal 3, received.scan("yay\n").size + assert_equal 3, received.scan("foo\n").size assert_equal ["ack\n","ack\n","ack\n"], responses end @@ -1258,7 +1260,8 @@ def open_tls_session(addr, port, verify: true, cert_path: nil, selfsigned: true, end end waiting(10){ sleep 0.1 until received.bytesize == 24 } - assert_equal "yay\nfoo\nyay\nfoo\nyay\nfoo\n", received + assert_equal 3, received.scan("yay\n").size + assert_equal 3, received.scan("foo\n").size assert_equal ["ack\n","ack\n","ack\n"], responses end @@ -1276,7 +1279,7 @@ def open_tls_session(addr, port, verify: true, cert_path: nil, selfsigned: true, end end waiting(10){ sleep 0.1 until received.bytesize == 12 } - assert_equal "yay\nyay\nyay\n", received + assert_equal 3, received.scan("yay\n").size assert{ sources.all?{|s| s == "127.0.0.1" } } end @@ -1296,7 +1299,7 @@ def open_tls_session(addr, port, verify: true, cert_path: nil, selfsigned: true, end end waiting(10){ sleep 0.1 until received.bytesize == 12 } - assert_equal "yay\nyay\nyay\n", received + assert_equal 3, received.scan("yay\n").size assert{ sources.all?{|s| s == hostname } } end @@ -1315,7 +1318,6 @@ def open_tls_session(addr, port, verify: true, cert_path: nil, selfsigned: true, errors << e end end - # open_tls_session('127.0.0.1', PORT, cert_path: @cert_path, hostname: @default_hostname) do |sock| open_tls_session('127.0.0.1', PORT, cert_path: @cert_path) do |sock| sock.puts "foo" end @@ -1339,7 +1341,6 @@ def open_tls_session(addr, port, verify: true, cert_path: nil, selfsigned: true, end end 3.times do - # open_tls_session('127.0.0.1', PORT, cert_path: @cert_path, hostname: @default_hostname) do |sock| open_tls_session('127.0.0.1', PORT, cert_path: @cert_path) do |sock| sock.write "yay" sock.write "foo\n" @@ -1370,7 +1371,6 @@ def open_tls_session(addr, port, verify: true, cert_path: nil, selfsigned: true, end end 3.times do - # open_tls_session('127.0.0.1', PORT, cert_path: @cert_path, hostname: @default_hostname) do |sock| open_tls_session('127.0.0.1', PORT, cert_path: @cert_path) do |sock| sock.write "yay" sock.write "foo\n" From 06487880bc6669b21b20e31a335e1fbfbe86f812 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Thu, 19 Jan 2017 11:19:48 +0900 Subject: [PATCH 13/17] take care about compatibility for existing configuration, and tests --- lib/fluent/plugin/out_forward.rb | 7 ++++++- test/plugin/test_out_forward.rb | 9 +++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index 4f8bc682a3..9d9f071ecb 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -43,7 +43,7 @@ class ConnectionClosedError < Error; end # TODO: add linger_timeout, recv_timeout desc 'The protocol to use for heartbeats (default is the same with "transport").' - config_param :heartbeat_type, :enum, list: [:transport, :udp, :none], default: :transport + config_param :heartbeat_type, :enum, list: [:transport, :tcp, :udp, :none], default: :transport desc 'The interval of the heartbeat packer.' config_param :heartbeat_interval, :time, default: 1 desc 'The wait time before accepting a server fault recovery.' @@ -154,6 +154,11 @@ def configure(conf) @read_interval = @read_interval_msec / 1000.0 @recover_sample_size = @recover_wait / @heartbeat_interval + if @heartbeat_type == :tcp + log.warn "'heartbeat_type tcp' is deprecated. use 'transport' instead." + @heartbeat_type = :transport + end + if @dns_round_robin if @heartbeat_type == :udp raise Fluent::ConfigError, "forward output heartbeat type must be 'transport' or 'none' to use dns_round_robin option" diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index 3d0082b1e6..f31c3c4508 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -72,7 +72,7 @@ def read_ack_from_sock(sock, unpacker) ]) nodes = d.instance.nodes assert_equal 60, d.instance.send_timeout - assert_equal :tcp, d.instance.heartbeat_type + assert_equal :transport, d.instance.heartbeat_type assert_equal 1, nodes.length node = nodes.first assert_equal "test", node.name @@ -119,8 +119,8 @@ def read_ack_from_sock(sock, unpacker) end end - test 'configure_dns_round_robin tcp' do - @d = d = create_driver(CONFIG + "\nheartbeat_type tcp\ndns_round_robin true") + test 'configure_dns_round_robin transport' do + @d = d = create_driver(CONFIG + "\nheartbeat_type transport\ndns_round_robin true") assert_equal true, d.instance.dns_round_robin end @@ -655,7 +655,8 @@ def create_target_input_driver(response_stub: nil, disconnect: false, conf: TARG usock = d.instance.instance_variable_get(:@usock) servers = d.instance.instance_variable_get(:@_servers) timers = d.instance.instance_variable_get(:@_timers) - assert_equal UDPSocket, usock.class + assert_equal Fluent::PluginHelper::Socket::WrappedSocket::UDP, usock.class + assert_kind_of UDPSocket, usock assert servers.find{|s| s.title == :out_forward_heartbeat_receiver } assert timers.include?(:out_forward_heartbeat_request) From 1090a1878dd76ff9db8d273d84a58206a65d9bae Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Thu, 19 Jan 2017 11:20:12 +0900 Subject: [PATCH 14/17] Travis-CI precise image do not support TLS v1.2 --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index a8c0ce3be4..ed12ba94cc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -43,6 +43,7 @@ branches: - v0.14 sudo: false +dist: trusty # for TLSv1.2 support addons: apt: From bfdec2963492bed803cba141adfc92ff28b3de4a Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 20 Jan 2017 11:19:21 +0900 Subject: [PATCH 15/17] fix cert path validation mistake --- lib/fluent/plugin/out_forward.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index 9d9f071ecb..fb9d642d1e 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -169,7 +169,7 @@ def configure(conf) if @tls_cert_path && !@tls_cert_path.empty? @tls_cert_path.each do |path| raise Fluent::ConfigError, "specified cert path does not exist:#{path}" unless File.exist?(path) - raise Fluent::ConfigError, "specified cert path is not readable:#{path}" unless File.exist?(path) + raise Fluent::ConfigError, "specified cert path is not readable:#{path}" unless File.readable?(path) end end From e68fb955cd188a99cb2b3909603113a4fbd71dd5 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 20 Jan 2017 13:53:54 +0900 Subject: [PATCH 16/17] fix to make test stable --- test/plugin_helper/test_server.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/plugin_helper/test_server.rb b/test/plugin_helper/test_server.rb index 81cd4cc136..18a4713105 100644 --- a/test/plugin_helper/test_server.rb +++ b/test/plugin_helper/test_server.rb @@ -469,7 +469,7 @@ class Dummy < Fluent::Plugin::TestBase end waiting(10){ sleep 0.1 until received.bytesize == 4 || errors.size == 1 } assert_equal "foo\n", received - assert_equal 1, errors.size + assert{ errors.size > 0 } # it might be called twice (or more) when connection was accepted, and then data arrived (or more) assert_equal "data callback can be registered just once, but registered twice", errors.first.message end From 9a011ddf16f5963f6910a8341461e0869d8cde20 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 20 Jan 2017 13:55:17 +0900 Subject: [PATCH 17/17] to identify deadlock test --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index ed12ba94cc..6aee403392 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,7 @@ language: ruby cache: bundler -# script: bundle exec rake test TESTOPTS=-v +script: bundle exec rake test TESTOPTS=-v # http://rubies.travis-ci.org/ # See here for osx_image -> OSX versions: https://docs.travis-ci.com/user/languages/objective-c