diff --git a/.travis.yml b/.travis.yml
index a8c0ce3be4..6aee403392 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,7 +1,7 @@
language: ruby
cache: bundler
-# script: bundle exec rake test TESTOPTS=-v
+script: bundle exec rake test TESTOPTS=-v
# http://rubies.travis-ci.org/
# See here for osx_image -> OSX versions: https://docs.travis-ci.com/user/languages/objective-c
@@ -43,6 +43,7 @@ branches:
- v0.14
sudo: false
+dist: trusty # for TLSv1.2 support
addons:
apt:
diff --git a/example/in_forward_tls.conf b/example/in_forward_tls.conf
new file mode 100644
index 0000000000..d6c9612b80
--- /dev/null
+++ b/example/in_forward_tls.conf
@@ -0,0 +1,14 @@
+
+
+
+ @type stdout
+ #
+ # flush_interval 10s
+ #
+
diff --git a/example/out_forward_tls.conf b/example/out_forward_tls.conf
new file mode 100644
index 0000000000..d6dfb8a719
--- /dev/null
+++ b/example/out_forward_tls.conf
@@ -0,0 +1,18 @@
+
+
+
+ @type forward
+ transport tls
+ tls_insecure_mode true
+
+ # first server
+ host 127.0.0.1
+ port 24224
+
+
+ flush_interval 0
+
+
diff --git a/lib/fluent/config/section.rb b/lib/fluent/config/section.rb
index 81b8954b1a..a61cb055b7 100644
--- a/lib/fluent/config/section.rb
+++ b/lib/fluent/config/section.rb
@@ -75,6 +75,10 @@ def [](key)
@params[key.to_sym]
end
+ def []=(key, value)
+ @params[key.to_sym] = value
+ end
+
def respond_to?(symbol, include_all=false)
case symbol
when :inspect, :nil?, :to_h, :+, :instance_of?, :kind_of?, :[], :respond_to?, :respond_to_missing?
diff --git a/lib/fluent/configurable.rb b/lib/fluent/configurable.rb
index 4b8c6ac6ca..7d5d4cf60b 100644
--- a/lib/fluent/configurable.rb
+++ b/lib/fluent/configurable.rb
@@ -48,12 +48,8 @@ def initialize
end
end
- def configure(conf)
- @config = conf
-
- logger = self.respond_to?(:log) ? log : (defined?($log) ? $log : nil)
+ def configure_proxy_generate
proxy = self.class.merged_configure_proxy
- conf.corresponding_proxies << proxy
if self.respond_to?(:owner) && self.owner
owner_proxy = owner.class.merged_configure_proxy
@@ -63,6 +59,36 @@ def configure(conf)
proxy.overwrite_defaults(owner_proxy) if owner_proxy
end
+ proxy
+ end
+
+ def configured_section_create(name, conf = nil)
+ conf ||= Fluent::Config::Element.new(name.to_s, '', {}, [])
+ root_proxy = configure_proxy_generate
+ proxy = if name.nil? # root
+ root_proxy
+ else
+ root_proxy.sections[name]
+ end
+ # take care to raise Fluent::ConfigError if conf mismatched to proxy
+ Fluent::Config::SectionGenerator.generate(proxy, conf, nil, nil)
+ end
+
+ def configure(conf)
+ @config = conf
+
+ logger = if self.respond_to?(:log)
+ self.log
+ elsif self.respond_to?(:owner) && self.owner.respond_to?(:log)
+ self.owner.log
+ elsif defined?($log)
+ $log
+ else
+ nil
+ end
+ proxy = configure_proxy_generate
+ conf.corresponding_proxies << proxy
+
# In the nested section, can't get plugin class through proxies so get plugin class here
plugin_class = Fluent::Plugin.lookup_type_from_class(proxy.name.to_s)
root = Fluent::Config::SectionGenerator.generate(proxy, conf, logger, plugin_class)
diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb
index b6786608c7..070d6e480f 100644
--- a/lib/fluent/plugin/in_forward.rb
+++ b/lib/fluent/plugin/in_forward.rb
@@ -154,7 +154,7 @@ def start
shared_socket = system_config.workers > 1
- log.info "listening a tcp port", port: @port, bind: @bind
+ log.info "listening port", port: @port, bind: @bind
server_create_connection(
:in_forward_server, @port,
bind: @bind,
diff --git a/lib/fluent/plugin/in_tcp.rb b/lib/fluent/plugin/in_tcp.rb
index 374ae3b6cc..428eb5258a 100644
--- a/lib/fluent/plugin/in_tcp.rb
+++ b/lib/fluent/plugin/in_tcp.rb
@@ -56,7 +56,7 @@ def start
super
@buffer = ''
- server_create(:in_tcp_server, @port, proto: :tcp, bind: @bind) do |data, conn|
+ server_create(:in_tcp_server, @port, bind: @bind) do |data, conn|
@buffer << data
begin
pos = 0
diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb
index 848c8f2cc2..fb9d642d1e 100644
--- a/lib/fluent/plugin/out_forward.rb
+++ b/lib/fluent/plugin/out_forward.rb
@@ -33,12 +33,17 @@ class ConnectionClosedError < Error; end
LISTEN_PORT = 24224
+ desc 'The transport protocol.'
+ config_param :transport, :enum, list: [:tcp, :tls], default: :tcp
+ # TODO: TLS session cache/tickets
+ # TODO: Connection keepalive
+
desc 'The timeout time when sending event logs.'
config_param :send_timeout, :time, default: 60
# TODO: add linger_timeout, recv_timeout
- desc 'The transport protocol to use for heartbeats.(udp,tcp,none)'
- config_param :heartbeat_type, :enum, list: [:tcp, :udp, :none], default: :tcp
+ desc 'The protocol to use for heartbeats (default is the same with "transport").'
+ config_param :heartbeat_type, :enum, list: [:transport, :tcp, :udp, :none], default: :transport
desc 'The interval of the heartbeat packer.'
config_param :heartbeat_interval, :time, default: 1
desc 'The wait time before accepting a server fault recovery.'
@@ -75,6 +80,19 @@ class ConnectionClosedError < Error; end
desc 'Compress buffered data.'
config_param :compress, :enum, list: [:text, :gzip], default: :text
+ desc 'The default version of TLS transport.'
+ config_param :tls_version, :enum, list: Fluent::PluginHelper::Socket::TLS_SUPPORTED_VERSIONS, default: Fluent::PluginHelper::Socket::TLS_DEFAULT_VERSION
+ desc 'The cipher configuration of TLS transport.'
+ config_param :tls_ciphers, :string, default: Fluent::PluginHelper::Socket::CIPHERS_DEFAULT
+ desc 'Skip all verification of certificates or not.'
+ config_param :tls_insecure_mode, :bool, default: false
+ desc 'Allow self signed certificates or not.'
+ config_param :tls_allow_self_signed_cert, :bool, default: false
+ desc 'Verify hostname of servers and certificates or not in TLS transport.'
+ config_param :tls_verify_hostname, :bool, default: true
+ desc 'The additional CA certificate path for TLS.'
+ config_param :tls_cert_path, :array, value_type: :string, default: nil
+
config_section :security, required: false, multi: false do
desc 'The hostname'
config_param :self_hostname, :string
@@ -85,7 +103,7 @@ class ConnectionClosedError < Error; end
config_section :server, param_name: :servers do
desc "The IP address or host name of the server."
config_param :host, :string
- desc "The name of the server. Used in log messages."
+ desc "The name of the server. Used for logging and certificate verification in TLS transport (when host is address)."
config_param :name, :string, default: nil
desc "The port number of the host."
config_param :port, :integer, default: LISTEN_PORT
@@ -136,9 +154,29 @@ def configure(conf)
@read_interval = @read_interval_msec / 1000.0
@recover_sample_size = @recover_wait / @heartbeat_interval
+ if @heartbeat_type == :tcp
+ log.warn "'heartbeat_type tcp' is deprecated. use 'transport' instead."
+ @heartbeat_type = :transport
+ end
+
if @dns_round_robin
if @heartbeat_type == :udp
- raise Fluent::ConfigError, "forward output heartbeat type must be 'tcp' or 'none' to use dns_round_robin option"
+ raise Fluent::ConfigError, "forward output heartbeat type must be 'transport' or 'none' to use dns_round_robin option"
+ end
+ end
+
+ if @transport == :tls
+ if @tls_cert_path && !@tls_cert_path.empty?
+ @tls_cert_path.each do |path|
+ raise Fluent::ConfigError, "specified cert path does not exist:#{path}" unless File.exist?(path)
+ raise Fluent::ConfigError, "specified cert path is not readable:#{path}" unless File.readable?(path)
+ end
+ end
+
+ if @tls_insecure_mode
+ log.warn "TLS transport is configured in insecure way"
+ @tls_verify_hostname = false
+ @tls_allow_self_signed_cert = true
end
end
@@ -275,14 +313,34 @@ def select_a_healthy_node
raise NoNodesAvailable, "no nodes are available"
end
- def create_transfer_socket(host, port, &block)
- socket_create_tcp(
- host, port,
- linger_timeout: @send_timeout,
- send_timeout: @send_timeout,
- recv_timeout: @ack_response_timeout,
- &block
- )
+ def create_transfer_socket(host, port, hostname, &block)
+ case @transport
+ when :tls
+ socket_create_tls(
+ host, port,
+ version: @tls_version,
+ ciphers: @tls_ciphers,
+ insecure: @tls_insecure_mode,
+ verify_fqdn: @tls_verify_hostname,
+ fqdn: hostname,
+ allow_self_signed_cert: @tls_allow_self_signed_cert,
+ cert_paths: @tls_cert_path,
+ linger_timeout: @send_timeout,
+ send_timeout: @send_timeout,
+ recv_timeout: @ack_response_timeout,
+ &block
+ )
+ when :tcp
+ socket_create_tcp(
+ host, port,
+ linger_timeout: @send_timeout,
+ send_timeout: @send_timeout,
+ recv_timeout: @ack_response_timeout,
+ &block
+ )
+ else
+ raise "BUG: unknown transport protocol #{@transport}"
+ end
end
# MessagePack FixArray length is 3
@@ -458,6 +516,14 @@ def initialize(sender, server, failure:)
@available = true
@state = nil
+ # @hostname is used for certificate verification & TLS SNI
+ host_is_hostname = !(IPAddr.new(@host) rescue false)
+ @hostname = case
+ when host_is_hostname then @host
+ when @name then @name
+ else nil
+ end
+
@usock = nil
@username = server.username
@@ -557,7 +623,7 @@ def send_data_actual(sock, tag, chunk)
end
def send_data(tag, chunk)
- sock = @sender.create_transfer_socket(resolved_host, port)
+ sock = @sender.create_transfer_socket(resolved_host, port, @hostname)
begin
send_data_actual(sock, tag, chunk)
rescue
@@ -589,8 +655,8 @@ def send_heartbeat
end
case @sender.heartbeat_type
- when :tcp
- @sender.create_transfer_socket(dest_addr, port) do |sock|
+ when :transport
+ @sender.create_transfer_socket(dest_addr, port, @hostname) do |sock|
## don't send any data to not cause a compatibility problem
# sock.write FORWARD_TCP_HEARTBEAT_DATA
diff --git a/lib/fluent/plugin_helper/cert_option.rb b/lib/fluent/plugin_helper/cert_option.rb
new file mode 100644
index 0000000000..6742142175
--- /dev/null
+++ b/lib/fluent/plugin_helper/cert_option.rb
@@ -0,0 +1,159 @@
+#
+# Fluentd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+require 'openssl'
+require 'socket'
+
+# this module is only for Socket/Server plugin helpers
+module Fluent
+ module PluginHelper
+ module CertOption
+ def cert_option_create_context(version, insecure, ciphers, conf)
+ cert, key, extra = cert_option_server_validate!(conf)
+
+ ctx = OpenSSL::SSL::SSLContext.new(version)
+ unless insecure
+ # inject OpenSSL::SSL::SSLContext::DEFAULT_PARAMS
+ # https://bugs.ruby-lang.org/issues/9424
+ ctx.set_params({})
+
+ ctx.ciphers = ciphers
+ end
+
+ ctx.cert = cert
+ ctx.key = key
+ if extra && !extra.empty?
+ ctx.extra_chain_cert = extra
+ end
+
+ ctx
+ end
+
+ def cert_option_server_validate!(conf)
+ case
+ when conf.cert_path
+ raise Fluent::ConfigError, "private_key_path is required when cert_path is specified" unless conf.private_key_path
+ raise Fluent::ConfigError, "private_key_passphrase is required when cert_path is specified" unless conf.private_key_passphrase
+ cert_option_load(conf.cert_path, conf.private_key_path, conf.private_key_passphrase)
+
+ when conf.ca_cert_path
+ raise Fluent::ConfigError, "ca_private_key_path is required when ca_cert_path is specified" unless conf.ca_private_key_path
+ raise Fluent::ConfigError, "ca_private_key_passphrase is required when ca_cert_path is specified" unless conf.ca_private_key_passphrase
+ generate_opts = cert_option_cert_generation_opts_from_conf(conf)
+ cert_option_generate_server_pair_by_ca(
+ conf.ca_cert_path,
+ conf.ca_private_key_path,
+ conf.ca_private_key_passphrase,
+ generate_opts
+ )
+
+ when conf.insecure
+ log.warn "insecure TLS communication server is configured (using 'insecure' mode)"
+ generate_opts = cert_option_cert_generation_opts_from_conf(conf)
+ cert_option_generate_server_pair_self_signed(generate_opts)
+
+ else
+ raise Fluent::ConfigError, "no valid cert options configured. specify either 'cert_path', 'ca_cert_path' or 'insecure'"
+ end
+ end
+
+ def cert_option_load(cert_path, private_key_path, private_key_passphrase)
+ key = OpenSSL::PKey::RSA.new(File.read(private_key_path), private_key_passphrase)
+ certs = cert_option_certificates_from_file(cert_path)
+ cert = certs.shift
+ return cert, key, certs
+ end
+
+ def cert_option_cert_generation_opts_from_conf(conf)
+ {
+ private_key_length: conf.generate_private_key_length,
+ country: conf.generate_cert_country,
+ state: conf.generate_cert_state,
+ locality: conf.generate_cert_locality,
+ common_name: conf.generate_cert_common_name || ::Socket.gethostname,
+ expiration: conf.generate_cert_expiration,
+ digest: conf.generate_cert_digest,
+ }
+ end
+
+ def cert_option_generate_pair(opts, issuer = nil)
+ key = OpenSSL::PKey::RSA.generate(opts[:private_key_length])
+
+ subject = OpenSSL::X509::Name.new
+ subject.add_entry('C', opts[:country])
+ subject.add_entry('ST', opts[:state])
+ subject.add_entry('L', opts[:locality])
+ subject.add_entry('CN', opts[:common_name])
+
+ issuer ||= subject
+
+ cert = OpenSSL::X509::Certificate.new
+ cert.not_before = Time.at(0)
+ cert.not_after = Time.now + opts[:expiration]
+ cert.public_key = key
+ cert.serial = 1
+ cert.issuer = issuer
+ cert.subject = subject
+
+ return cert, key
+ end
+
+ def cert_option_generate_ca_pair_self_signed(generate_opts)
+ cert, key = cert_option_generate_pair(generate_opts)
+
+ # basicConstraints: this cert is for CA or not
+ cert.add_extension OpenSSL::X509::Extension.new('basicConstraints', OpenSSL::ASN1.Sequence([OpenSSL::ASN1::Boolean(true)]))
+
+ cert.sign(key, generate_opts[:digest].to_s)
+ return cert, key
+ end
+
+ def cert_option_generate_server_pair_by_ca(ca_cert_path, ca_key_path, ca_key_passphrase, generate_opts)
+ ca_key = OpenSSL::PKey::RSA.new(File.read(ca_key_path), ca_key_passphrase)
+ ca_cert = OpenSSL::X509::Certificate.new(File.read(ca_cert_path))
+ cert, key = cert_option_generate_pair(generate_opts, ca_cert.subject)
+ raise "BUG: certificate digest algorithm not set" unless generate_opts[:digest]
+
+ # basicConstraints: this cert is for CA or not
+ cert.add_extension OpenSSL::X509::Extension.new('basicConstraints', OpenSSL::ASN1.Sequence([OpenSSL::ASN1::Boolean(false)]))
+ cert.add_extension OpenSSL::X509::Extension.new('nsCertType', 'server')
+
+ cert.sign(ca_key, generate_opts[:digest].to_s)
+ return cert, key, nil
+ end
+
+ def cert_option_generate_server_pair_self_signed(generate_opts)
+ cert, key = cert_option_generate_pair(generate_opts)
+ raise "BUG: certificate digest algorithm not set" unless generate_opts[:digest]
+
+ # basicConstraints: this cert is for CA or not
+ cert.add_extension OpenSSL::X509::Extension.new('basicConstraints', OpenSSL::ASN1.Sequence([OpenSSL::ASN1::Boolean(false)]))
+ cert.add_extension OpenSSL::X509::Extension.new('nsCertType', 'server')
+
+ cert.sign(key, generate_opts[:digest].to_s)
+ return cert, key, nil
+ end
+
+ def cert_option_certificates_from_file(path)
+ data = File.read(path)
+ pattern = Regexp.compile('-+BEGIN CERTIFICATE-+\n(?:[^-]*\n)+-+END CERTIFICATE-+\n', Regexp::MULTILINE)
+ list = []
+ data.scan(pattern){|match| list << OpenSSL::X509::Certificate.new(match) }
+ list
+ end
+ end
+ end
+end
diff --git a/lib/fluent/plugin_helper/server.rb b/lib/fluent/plugin_helper/server.rb
index d13fdd9f3a..ffdb57dab1 100644
--- a/lib/fluent/plugin_helper/server.rb
+++ b/lib/fluent/plugin_helper/server.rb
@@ -21,18 +21,22 @@
require 'socket'
require 'ipaddr'
require 'fcntl'
+require 'openssl'
require_relative 'socket_option'
+require_relative 'cert_option'
module Fluent
module PluginHelper
module Server
include Fluent::PluginHelper::EventLoop
include Fluent::PluginHelper::SocketOption
+ include Fluent::PluginHelper::CertOption
# This plugin helper doesn't support these things for now:
- # * SSL/TLS (TBD)
# * TCP/TLS keepalive
+ # * TLS session cache/tickets
+ # * unix domain sockets
# stop : [-]
# shutdown : detach server event handler from event loop (event_loop)
@@ -63,12 +67,16 @@ def server_wait_until_stop
# conn.close
# end
# end
- def server_create_connection(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, backlog: nil, **socket_options, &block)
+ def server_create_connection(title, port, proto: nil, bind: '0.0.0.0', shared: true, backlog: nil, tls_options: nil, **socket_options, &block)
+ proto ||= (@transport_config && @transport_config.protocol == :tls) ? :tls : :tcp
+
raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol)
raise ArgumentError, "BUG: port must be an integer" unless port && port.is_a?(Integer)
raise ArgumentError, "BUG: invalid protocol name" unless PROTOCOLS.include?(proto)
raise ArgumentError, "BUG: cannot create connection for UDP" unless CONNECTION_PROTOCOLS.include?(proto)
+ raise ArgumentError, "BUG: tls_options is available only for tls" if tls_options && proto != :tls
+
raise ArgumentError, "BUG: block not specified which handles connection" unless block_given?
raise ArgumentError, "BUG: block must have just one argument" unless block.arity == 1
@@ -83,11 +91,14 @@ def server_create_connection(title, port, proto: :tcp, bind: '0.0.0.0', shared:
when :tcp
server = server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, &block)
when :tls
- raise ArgumentError, "BUG: certopts (certificate options) not specified for TLS" unless certopts
- # server_certopts_validate!(certopts)
- # sock = server_create_tls_socket(shared, bind, port)
- # server = nil # ...
- raise "not implemented yet"
+ transport_config = if tls_options
+ server_create_transport_section_object(tls_options)
+ elsif @transport_config && @transport_config.protocol == :tls
+ @transport_config
+ else
+ raise ArgumentError, "BUG: TLS transport specified, but certification options are not specified"
+ end
+ server = server_create_for_tls_connection(shared, bind, port, transport_config, backlog, socket_option_setter, &block)
when :unix
raise "not implemented yet"
else
@@ -108,12 +119,15 @@ def server_create_connection(title, port, proto: :tcp, bind: '0.0.0.0', shared:
# sock.remote_port
# # ...
# end
- def server_create(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, socket: nil, backlog: nil, max_bytes: nil, flags: 0, **socket_options, &callback)
+ def server_create(title, port, proto: nil, bind: '0.0.0.0', shared: true, socket: nil, backlog: nil, tls_options: nil, max_bytes: nil, flags: 0, **socket_options, &callback)
+ proto ||= (@transport_config && @transport_config.protocol == :tls) ? :tls : :tcp
+
raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol)
raise ArgumentError, "BUG: port must be an integer" unless port && port.is_a?(Integer)
raise ArgumentError, "BUG: invalid protocol name" unless PROTOCOLS.include?(proto)
raise ArgumentError, "BUG: socket option is available only for udp" if socket && proto != :udp
+ raise ArgumentError, "BUG: tls_options is available only for tls" if tls_options && proto != :tls
raise ArgumentError, "BUG: block not specified which handles received data" unless block_given?
raise ArgumentError, "BUG: block must have 1 or 2 arguments" unless callback.arity == 1 || callback.arity == 2
@@ -141,7 +155,16 @@ def server_create(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, socke
conn.data(&callback)
end
when :tls
- raise "not implemented yet"
+ transport_config = if tls_options
+ server_create_transport_section_object(tls_options)
+ elsif @transport_config && @transport_config.protocol == :tls
+ @transport_config
+ else
+ raise ArgumentError, "BUG: TLS transport specified, but certification options are not specified"
+ end
+ server = server_create_for_tls_connection(shared, bind, port, transport_config, backlog, socket_option_setter) do |conn|
+ conn.data(&callback)
+ end
when :udp
raise ArgumentError, "BUG: max_bytes must be specified for UDP" unless max_bytes
if socket
@@ -198,6 +221,80 @@ def server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_
server
end
+ def server_create_for_tls_connection(shared, bind, port, conf, backlog, socket_option_setter, &block)
+ context = cert_option_create_context(conf.version, conf.insecure, conf.ciphers, conf)
+ sock = server_create_tcp_socket(shared, bind, port)
+ socket_option_setter.call(sock)
+ close_callback = ->(conn){ @_server_mutex.synchronize{ @_server_connections.delete(conn) } }
+ server = Coolio::TCPServer.new(sock, nil, EventHandler::TLSServer, context, socket_option_setter, close_callback, @log, @under_plugin_development, block) do |conn|
+ @_server_mutex.synchronize do
+ @_server_connections << conn
+ end
+ end
+ server.listen(backlog) if backlog
+ server
+ end
+
+ SERVER_TRANSPORT_PARAMS = [
+ :protocol, :version, :ciphers, :insecure,
+ :cert_path, :private_key_path, :private_key_passphrase,
+ :ca_cert_path, :ca_private_key_path, :ca_private_key_passphrase,
+ :generate_private_key_length,
+ :generate_cert_country, :generate_cert_state, :generate_cert_state,
+ :generate_cert_locality, :generate_cert_common_name,
+ :generate_cert_expiration, :generate_cert_digest,
+ ]
+
+ def server_create_transport_section_object(opts)
+ transport_section = configured_section_create(:transport)
+ SERVER_TRANSPORT_PARAMS.each do |param|
+ if opts.has_key?(param)
+ transport_section[param] = opts[param]
+ end
+ end
+ transport_section
+ end
+
+ module ServerTransportParams
+ TLS_DEFAULT_VERSION = :'TLSv1_2'
+ TLS_SUPPORTED_VERSIONS = [:'TLSv1_1', :'TLSv1_2']
+ ### follow httpclient configuration by nahi
+ # OpenSSL 0.9.8 default: "ALL:!ADH:!LOW:!EXP:!MD5:+SSLv2:@STRENGTH"
+ CIPHERS_DEFAULT = "ALL:!aNULL:!eNULL:!SSLv2" # OpenSSL >1.0.0 default
+
+ include Fluent::Configurable
+ config_section :transport, required: false, multi: false, init: true, param_name: :transport_config do
+ config_argument :protocol, :enum, list: [:tcp, :tls], default: :tcp
+ config_param :version, :enum, list: TLS_SUPPORTED_VERSIONS, default: TLS_DEFAULT_VERSION
+
+ config_param :ciphers, :string, default: CIPHERS_DEFAULT
+ config_param :insecure, :bool, default: false
+
+ # Cert signed by public CA
+ config_param :cert_path, :string, default: nil
+ config_param :private_key_path, :string, default: nil
+ config_param :private_key_passphrase, :string, default: nil, secret: true
+
+ # Cert generated and signed by private CA Certificate
+ config_param :ca_cert_path, :string, default: nil
+ config_param :ca_private_key_path, :string, default: nil
+ config_param :ca_private_key_passphrase, :string, default: nil, secret: true
+
+ # Options for generating certs by private CA certs or self-signed
+ config_param :generate_private_key_length, :integer, default: 2048
+ config_param :generate_cert_country, :string, default: 'US'
+ config_param :generate_cert_state, :string, default: 'CA'
+ config_param :generate_cert_locality, :string, default: 'Mountain View'
+ config_param :generate_cert_common_name, :string, default: nil
+ config_param :generate_cert_expiration, :time, default: 10 * 365 * 86400 # 10years later
+ config_param :generate_cert_digest, :enum, list: [:sha1, :sha256, :sha384, :sha512], default: :sha256
+ end
+ end
+
+ def self.included(mod)
+ mod.include ServerTransportParams
+ end
+
def initialize
super
@_servers = []
@@ -205,6 +302,16 @@ def initialize
@_server_mutex = Mutex.new
end
+ def configure(conf)
+ super
+
+ if @transport_config
+ if @transport_config.protocol == :tls
+ cert_option_server_validate!(@transport_config)
+ end
+ end
+ end
+
def stop
@_server_mutex.synchronize do
@_servers.each do |si|
@@ -230,10 +337,6 @@ def terminate
super
end
- def server_certopts_validate!(certopts)
- raise "not implemented yet"
- end
-
def server_socket_manager_client
socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
if Fluent.windows?
@@ -267,28 +370,25 @@ def server_create_udp_socket(shared, bind, port)
sock
end
- def server_create_tls_socket(shared, bind, port)
- raise "not implemented yet"
- end
-
class CallbackSocket
def initialize(server_type, sock, enabled_events = [], close_socket: true)
@server_type = server_type
@sock = sock
+ @peeraddr = nil
@enabled_events = enabled_events
@close_socket = close_socket
end
def remote_addr
- @sock.peeraddr[3]
+ @peeraddr[3]
end
def remote_host
- @sock.peeraddr[2]
+ @peeraddr[2]
end
def remote_port
- @sock.peeraddr[1]
+ @peeraddr[1]
end
def send(data, flags = 0)
@@ -327,6 +427,18 @@ def on(event, &callback)
class TCPCallbackSocket < CallbackSocket
def initialize(sock)
super("tcp", sock, [:data, :write_complete, :close])
+ @peeraddr = @sock.peeraddr
+ end
+
+ def write(data)
+ @sock.write(data)
+ end
+ end
+
+ class TLSCallbackSocket < CallbackSocket
+ def initialize(sock)
+ super("tls", sock, [:data, :write_complete, :close])
+ @peeraddr = @sock.to_io.peeraddr
end
def write(data)
@@ -430,6 +542,10 @@ def initialize(sock, socket_option_setter, close_callback, log, under_plugin_dev
@mutex = Mutex.new # to serialize #write and #close
end
+ def to_io
+ @_handler_socket
+ end
+
def data(&callback)
raise "data callback can be registered just once, but registered twice" if self.singleton_methods.include?(:on_read)
@data_callback = callback
@@ -459,7 +575,140 @@ def on_connect
def on_read_without_connection(data)
@data_callback.call(data)
rescue => e
- @log.error "unexpected error on reading data", host: remote_host, port: remote_port, error: e
+ @log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e
+ @log.error_backtrace
+ close(true) rescue nil
+ raise if @under_plugin_development
+ end
+
+ def on_read_with_connection(data)
+ @data_callback.call(data, @callback_connection)
+ rescue => e
+ @log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e
+ @log.error_backtrace
+ close(true) rescue nil
+ raise if @under_plugin_development
+ end
+
+ def close
+ @mutex.synchronize do
+ return if @closing
+ @closing = true
+ @close_callback.call(self)
+ super
+ end
+ end
+ end
+
+ class TLSServer < Coolio::Socket
+ # It can't use Coolio::TCPSocket, because Coolio::TCPSocket checks that underlying socket (1st argument of super) is TCPSocket.
+ def initialize(sock, context, socket_option_setter, close_callback, log, under_plugin_development, connect_callback)
+ raise ArgumentError, "socket must be a TCPSocket: sock=#{sock}" unless sock.is_a?(TCPSocket)
+
+ socket_option_setter.call(sock)
+ @_handler_socket = OpenSSL::SSL::SSLSocket.new(sock, context)
+ @_handler_socket.sync_close = true
+ @_handler_write_buffer = ''.force_encoding('ascii-8bit')
+ @_handler_accepted = false
+ super(@_handler_socket)
+
+ @log = log
+ @under_plugin_development = under_plugin_development
+
+ @connect_callback = connect_callback
+ @data_callback = nil
+ @close_callback = close_callback
+
+ @callback_connection = nil
+ @closing = false
+
+ @mutex = Mutex.new # to serialize #write and #close
+ end
+
+ def to_io
+ @_handler_socket.to_io
+ end
+
+ def data(&callback)
+ raise "data callback can be registered just once, but registered twice" if self.singleton_methods.include?(:on_read)
+ @data_callback = callback
+ on_read_impl = case callback.arity
+ when 1 then :on_read_without_connection
+ when 2 then :on_read_with_connection
+ else
+ raise "BUG: callback block must have 1 or 2 arguments"
+ end
+ self.define_singleton_method(:on_read, method(on_read_impl))
+ end
+
+ def write(data)
+ @mutex.synchronize do
+ @_handler_write_buffer << data
+ schedule_write
+ data.bytesize
+ end
+ end
+
+ def try_tls_accept
+ return true if @_handler_accepted
+
+ begin
+ @_handler_socket.accept_nonblock # this method call actually try to do handshake via TLS
+ @_handler_accepted = true
+
+ @callback_connection = TLSCallbackSocket.new(self)
+ @connect_callback.call(@callback_connection)
+ unless @data_callback
+ raise "connection callback must call #data to set data callback"
+ end
+ return true
+
+ rescue IO::WaitReadable, IO::WaitWritable
+ # retry accept_nonblock: there aren't enough data in underlying socket buffer
+ rescue OpenSSL::SSL::SSLError => e
+ @log.trace "unexpected error before accepting TLS connection", error: e
+ close rescue nil
+ end
+
+ false
+ end
+
+ def on_connect
+ try_tls_accept
+ end
+
+ def on_readable
+ if try_tls_accept
+ super
+ end
+ rescue IO::WaitReadable, IO::WaitWritable
+ # ignore and return with doing nothing
+ end
+
+ def on_writable
+ begin
+ @mutex.synchronize do
+ written_bytes = @_handler_socket.write_nonblock(@_handler_write_buffer)
+ @_handler_write_buffer.slice!(0, written_bytes)
+ super
+ end
+ rescue IO::WaitWritable, IO::WaitReadable
+ return
+ rescue Errno::EINTR
+ return
+ rescue SystemCallError, IOError, SocketError
+ # SystemCallError catches Errno::EPIPE & Errno::ECONNRESET amongst others.
+ close rescue nil
+ return
+ rescue OpenSSL::SSL::SSLError => e
+ @log.debug "unexpected SSLError while writing data into socket connected via TLS", error: e
+ end
+ end
+
+ def on_read_without_connection(data)
+ @data_callback.call(data)
+ rescue => e
+ @log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e
@log.error_backtrace
close(true) rescue nil
raise if @under_plugin_development
@@ -468,7 +717,7 @@ def on_read_without_connection(data)
def on_read_with_connection(data)
@data_callback.call(data, @callback_connection)
rescue => e
- @log.error "unexpected error on reading data", host: remote_host, port: remote_port, error: e
+ @log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e
@log.error_backtrace
close(true) rescue nil
raise if @under_plugin_development
diff --git a/lib/fluent/plugin_helper/socket.rb b/lib/fluent/plugin_helper/socket.rb
index 87a5aaab72..b751fe7b85 100644
--- a/lib/fluent/plugin_helper/socket.rb
+++ b/lib/fluent/plugin_helper/socket.rb
@@ -16,6 +16,7 @@
require 'socket'
require 'ipaddr'
+require 'openssl'
require_relative 'socket_option'
@@ -29,6 +30,12 @@ module Socket
include Fluent::PluginHelper::SocketOption
+ TLS_DEFAULT_VERSION = :'TLSv1_2'
+ TLS_SUPPORTED_VERSIONS = [:'TLSv1_1', :'TLSv1_2']
+ ### follow httpclient configuration by nahi
+ # OpenSSL 0.9.8 default: "ALL:!ADH:!LOW:!EXP:!MD5:+SSLv2:@STRENGTH"
+ CIPHERS_DEFAULT = "ALL:!aNULL:!eNULL:!SSLv2" # OpenSSL >1.0.0 default
+
attr_reader :_sockets # for tests
# TODO: implement connection pool for specified host
@@ -49,7 +56,7 @@ def socket_create(proto, host, port, **kwargs, &block)
end
def socket_create_tcp(host, port, resolve_name: false, **kwargs, &block)
- sock = TCPSocket.new(host, port)
+ sock = WrappedSocket::TCP.new(host, port)
socket_option_set(sock, resolve_name: resolve_name, **kwargs)
if block
begin
@@ -65,7 +72,7 @@ def socket_create_tcp(host, port, resolve_name: false, **kwargs, &block)
def socket_create_udp(host, port, resolve_name: false, connect: false, **kwargs, &block)
family = IPAddr.new(IPSocket.getaddress(host)).ipv4? ? ::Socket::AF_INET : ::Socket::AF_INET6
- sock = UDPSocket.new(family)
+ sock = WrappedSocket::UDP.new(family)
socket_option_set(sock, resolve_name: resolve_name, **kwargs)
sock.connect(host, port) if connect
if block
@@ -79,12 +86,145 @@ def socket_create_udp(host, port, resolve_name: false, connect: false, **kwargs,
end
end
- def socket_create_tls(host, port, resolve_name: false, certopts: {}, &block)
- raise "not implemented yet"
+ def socket_create_tls(
+ host, port,
+ version: TLS_DEFAULT_VERSION, ciphers: CIPHERS_DEFAULT, insecure: false, verify_fqdn: true, fqdn: nil,
+ enable_system_cert_store: true, allow_self_signed_cert: false, cert_paths: nil, **kwargs, &block)
+
+ host_is_ipaddress = IPAddr.new(host) rescue false
+ fqdn ||= host unless host_is_ipaddress
+
+ context = OpenSSL::SSL::SSLContext.new(version)
+
+ if insecure
+ log.trace "setting TLS verify_mode NONE"
+ context.verify_mode = OpenSSL::SSL::VERIFY_NONE
+ else
+ cert_store = OpenSSL::X509::Store.new
+ if allow_self_signed_cert && OpenSSL::X509.const_defined?('V_FLAG_CHECK_SS_SIGNATURE')
+ cert_store.flags = OpenSSL::X509::V_FLAG_CHECK_SS_SIGNATURE
+ end
+ begin
+ if enable_system_cert_store
+ log.trace "loading system default certificate store"
+ cert_store.set_default_paths
+ end
+ rescue OpenSSL::X509::StoreError
+ log.warn "failed to load system default certificate store", error: e
+ end
+ if cert_paths
+ if cert_paths.respond_to?(:each)
+ cert_paths.each do |cert_path|
+ log.trace "adding CA cert", path: cert_path
+ cert_store.add_file(cert_path)
+ end
+ else
+ cert_path = cert_paths
+ log.trace "adding CA cert", path: cert_path
+ cert_store.add_file(cert_path)
+ end
+ end
+
+ log.trace "setting TLS context", mode: "peer", ciphers: ciphers
+ context.set_params({})
+ context.ciphers = ciphers
+ context.verify_mode = OpenSSL::SSL::VERIFY_PEER
+ context.cert_store = cert_store
+ context.verify_hostname = true if verify_fqdn && fqdn && context.respond_to?(:verify_hostname=)
+ end
+
+ tcpsock = socket_create_tcp(host, port, **kwargs)
+ sock = WrappedSocket::TLS.new(tcpsock, context)
+ sock.sync_close = true
+ sock.hostname = fqdn if verify_fqdn && fqdn && sock.respond_to?(:hostname=)
+
+ log.trace "entering TLS handshake"
+ sock.connect
+
+ begin
+ if verify_fqdn
+ log.trace "checking peer's certificate", subject: sock.peer_cert.subject
+ sock.post_connection_check(fqdn)
+ verify = sock.verify_result
+ if verify != OpenSSL::X509::V_OK
+ err_name = Socket.tls_verify_result_name(verify)
+ log.warn "BUG: failed to verify certification while connecting (but not raised, why?)", host: host, fqdn: fqdn, error: err_name
+ raise RuntimeError, "BUG: failed to verify certification and to handle it correctly while connecting host #{host} as #{fqdn}"
+ end
+ end
+ rescue OpenSSL::SSL::SSLError => e
+ log.warn "failed to verify certification while connecting tls session", host: host, fqdn: fqdn, error: e
+ raise
+ end
+
+ if block
+ begin
+ block.call(sock)
+ ensure
+ sock.close rescue nil
+ end
+ else
+ sock
+ end
+ end
+
+ def self.tls_verify_result_name(code)
+ case code
+ when OpenSSL::X509::V_OK then 'V_OK'
+ when OpenSSL::X509::V_ERR_AKID_SKID_MISMATCH then 'V_ERR_AKID_SKID_MISMATCH'
+ when OpenSSL::X509::V_ERR_APPLICATION_VERIFICATION then 'V_ERR_APPLICATION_VERIFICATION'
+ when OpenSSL::X509::V_ERR_CERT_CHAIN_TOO_LONG then 'V_ERR_CERT_CHAIN_TOO_LONG'
+ when OpenSSL::X509::V_ERR_CERT_HAS_EXPIRED then 'V_ERR_CERT_HAS_EXPIRED'
+ when OpenSSL::X509::V_ERR_CERT_NOT_YET_VALID then 'V_ERR_CERT_NOT_YET_VALID'
+ when OpenSSL::X509::V_ERR_CERT_REJECTED then 'V_ERR_CERT_REJECTED'
+ when OpenSSL::X509::V_ERR_CERT_REVOKED then 'V_ERR_CERT_REVOKED'
+ when OpenSSL::X509::V_ERR_CERT_SIGNATURE_FAILURE then 'V_ERR_CERT_SIGNATURE_FAILURE'
+ when OpenSSL::X509::V_ERR_CERT_UNTRUSTED then 'V_ERR_CERT_UNTRUSTED'
+ when OpenSSL::X509::V_ERR_CRL_HAS_EXPIRED then 'V_ERR_CRL_HAS_EXPIRED'
+ when OpenSSL::X509::V_ERR_CRL_NOT_YET_VALID then 'V_ERR_CRL_NOT_YET_VALID'
+ when OpenSSL::X509::V_ERR_CRL_SIGNATURE_FAILURE then 'V_ERR_CRL_SIGNATURE_FAILURE'
+ when OpenSSL::X509::V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT then 'V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT'
+ when OpenSSL::X509::V_ERR_ERROR_IN_CERT_NOT_AFTER_FIELD then 'V_ERR_ERROR_IN_CERT_NOT_AFTER_FIELD'
+ when OpenSSL::X509::V_ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD then 'V_ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD'
+ when OpenSSL::X509::V_ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD then 'V_ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD'
+ when OpenSSL::X509::V_ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD then 'V_ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD'
+ when OpenSSL::X509::V_ERR_INVALID_CA then 'V_ERR_INVALID_CA'
+ when OpenSSL::X509::V_ERR_INVALID_PURPOSE then 'V_ERR_INVALID_PURPOSE'
+ when OpenSSL::X509::V_ERR_KEYUSAGE_NO_CERTSIGN then 'V_ERR_KEYUSAGE_NO_CERTSIGN'
+ when OpenSSL::X509::V_ERR_OUT_OF_MEM then 'V_ERR_OUT_OF_MEM'
+ when OpenSSL::X509::V_ERR_PATH_LENGTH_EXCEEDED then 'V_ERR_PATH_LENGTH_EXCEEDED'
+ when OpenSSL::X509::V_ERR_SELF_SIGNED_CERT_IN_CHAIN then 'V_ERR_SELF_SIGNED_CERT_IN_CHAIN'
+ when OpenSSL::X509::V_ERR_SUBJECT_ISSUER_MISMATCH then 'V_ERR_SUBJECT_ISSUER_MISMATCH'
+ when OpenSSL::X509::V_ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY then 'V_ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY'
+ when OpenSSL::X509::V_ERR_UNABLE_TO_DECRYPT_CERT_SIGNATURE then 'V_ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY'
+ when OpenSSL::X509::V_ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE then 'V_ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE'
+ when OpenSSL::X509::V_ERR_UNABLE_TO_GET_CRL then 'V_ERR_UNABLE_TO_GET_CRL'
+ when OpenSSL::X509::V_ERR_UNABLE_TO_GET_ISSUER_CERT then 'V_ERR_UNABLE_TO_GET_ISSUER_CERT'
+ when OpenSSL::X509::V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY then 'V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY'
+ when OpenSSL::X509::V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE then 'V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE'
+ end
end
# socket_create_socks ?
+ module WrappedSocket
+ class TCP < ::TCPSocket
+ def remote_addr; peeraddr[3]; end
+ def remote_host; peeraddr[2]; end
+ def remote_port; peeraddr[1]; end
+ end
+ class UDP < ::UDPSocket
+ def remote_addr; peeraddr[3]; end
+ def remote_host; peeraddr[2]; end
+ def remote_port; peeraddr[1]; end
+ end
+ class TLS < OpenSSL::SSL::SSLSocket
+ def remote_addr; peeraddr[3]; end
+ def remote_host; peeraddr[2]; end
+ def remote_port; peeraddr[1]; end
+ end
+ end
+
def initialize
super
# @_sockets = [] # for keepalived sockets / connection pool
diff --git a/lib/fluent/plugin_helper/socket_option.rb b/lib/fluent/plugin_helper/socket_option.rb
index 82ad59ccf0..77fd9e3b66 100644
--- a/lib/fluent/plugin_helper/socket_option.rb
+++ b/lib/fluent/plugin_helper/socket_option.rb
@@ -24,7 +24,7 @@ module SocketOption
FORMAT_STRUCT_LINGER = 'I!I!' # { int l_onoff; int l_linger; }
FORMAT_STRUCT_TIMEVAL = 'L!L!' # { time_t tv_sec; suseconds_t tv_usec; }
- def socket_option_validate!(protocol, resolve_name: nil, linger_timeout: nil, recv_timeout: nil, send_timeout: nil, certopts: nil)
+ def socket_option_validate!(protocol, resolve_name: nil, linger_timeout: nil, recv_timeout: nil, send_timeout: nil)
unless resolve_name.nil?
if protocol != :tcp && protocol != :udp && protocol != :tls
raise ArgumentError, "BUG: resolve_name in available for tcp/udp/tls"
@@ -35,23 +35,9 @@ def socket_option_validate!(protocol, resolve_name: nil, linger_timeout: nil, re
raise ArgumentError, "BUG: linger_timeout is available for tcp/tls"
end
end
- if certopts
- if protocol != :tls
- raise ArgumentError, "BUG: certopts is available only for tls"
- end
- else
- if protocol == :tls
- raise ArgumentError, "BUG: certopts (certificate options) not specified for TLS"
- socket_option_certopts_validate!(certopts)
- end
- end
- end
-
- def socket_option_certopts_validate!(certopts)
- raise "not implemented yet"
end
- def socket_option_set(sock, resolve_name: nil, nonblock: false, linger_timeout: nil, recv_timeout: nil, send_timeout: nil, certopts: nil)
+ def socket_option_set(sock, resolve_name: nil, nonblock: false, linger_timeout: nil, recv_timeout: nil, send_timeout: nil)
unless resolve_name.nil?
sock.do_not_reverse_lookup = !resolve_name
end
@@ -70,7 +56,6 @@ def socket_option_set(sock, resolve_name: nil, nonblock: false, linger_timeout:
optval = [send_timeout.to_i, 0].pack(FORMAT_STRUCT_TIMEVAL)
socket_option_set_one(sock, :SO_SNDTIMEO, optval)
end
- # TODO: certopts for TLS
sock
end
diff --git a/test/config/test_configurable.rb b/test/config/test_configurable.rb
index 6cf4bc9cf7..66d34b0847 100644
--- a/test/config/test_configurable.rb
+++ b/test/config/test_configurable.rb
@@ -24,6 +24,12 @@ def get_all
end
end
+ class Base1Safe < Base1
+ config_set_default :name1, "basex1"
+ config_set_default :name2, "basex2"
+ config_set_default :opt1, :baz
+ end
+
class Base2 < Base1
config_set_default :name2, "base2"
config_set_default :name4, "base2"
@@ -89,6 +95,40 @@ def get_all
end
end
+ class Base4Safe < Base4
+ # config_section :node, param_name: :nodes do
+ # config_argument :num, :integer
+ # config_param :name, :string, default: "node"
+ # config_param :type, :string, default: "b4"
+ # end
+ # config_section :description1, required: false, multi: false do
+ # config_argument :note, :string, default: "desc1"
+ # config_param :text, :string
+ # end
+ # config_section :description2, required: true, multi: false do
+ # config_argument :note, :string, default: "desc2"
+ # config_param :text, :string
+ # end
+ # config_section :description3, required: true, multi: true do
+ # config_argument :note, default: "desc3" do |val|
+ # "desc3: #{val}"
+ # end
+ # config_param :text, :string
+ # end
+ config_section :node do
+ config_set_default :num, 0
+ end
+ config_section :description1 do
+ config_set_default :text, "teeeext"
+ end
+ config_section :description2 do
+ config_set_default :text, nil
+ end
+ config_section :description3 do
+ config_set_default :text, "yay"
+ end
+ end
+
class Init0
include Fluent::Configurable
config_section :sec1, init: true, multi: false do
@@ -407,6 +447,61 @@ class TestConfigurable < ::Test::Unit::TestCase
end
end
+ sub_test_case '#configured_section_create' do
+ test 'raises configuration error if required param exists but no configuration element is specified' do
+ obj = ConfigurableSpec::Base1.new
+ assert_raise(Fluent::ConfigError.new("'name1' parameter is required")) do
+ obj.configured_section_create(nil)
+ end
+ end
+
+ test 'creates root section with default values if name and config are specified with nil' do
+ obj = ConfigurableSpec::Base1Safe.new
+ root = obj.configured_section_create(nil)
+
+ assert_equal "node", root.node
+ assert_false root.flag1
+ assert_true root.flag2
+ assert_equal "basex1", root.name1
+ assert_equal "basex2", root.name2
+ assert_equal "base1", root.name3
+ assert_equal "base1", root.name4
+ assert_equal :baz, root.opt1
+ assert_equal :foo, root.opt2
+ end
+
+ test 'creates root section with default values if name is nil and config is empty element' do
+ obj = ConfigurableSpec::Base1Safe.new
+ root = obj.configured_section_create(nil, config_element())
+
+ assert_equal "node", root.node
+ assert_false root.flag1
+ assert_true root.flag2
+ assert_equal "basex1", root.name1
+ assert_equal "basex2", root.name2
+ assert_equal "base1", root.name3
+ assert_equal "base1", root.name4
+ assert_equal :baz, root.opt1
+ assert_equal :foo, root.opt2
+ end
+
+ test 'creates root section with specified value if name is nil and configuration element is specified' do
+ obj = ConfigurableSpec::Base1Safe.new
+ root = obj.configured_section_create(nil, config_element('match', '', {'node' => "nodename", 'flag1' => 'true', 'name1' => 'fixed1', 'opt1' => 'foo'}))
+
+ assert_equal "nodename", root.node
+ assert_equal "fixed1", root.name1
+ assert_true root.flag1
+ assert_equal :foo, root.opt1
+
+ assert_true root.flag2
+ assert_equal "basex2", root.name2
+ assert_equal "base1", root.name3
+ assert_equal "base1", root.name4
+ assert_equal :foo, root.opt2
+ end
+ end
+
sub_test_case '#configure' do
test 'returns configurable object itself' do
b2 = ConfigurableSpec::Base2.new
@@ -526,6 +621,65 @@ class TestConfigurable < ::Test::Unit::TestCase
end
end
+ sub_test_case '#configured_section_create' do
+ test 'raises configuration error if required param exists but no configuration element is specified' do
+ obj = ConfigurableSpec::Base4.new
+ assert_raise(Fluent::ConfigError.new("'' section requires argument")) do
+ obj.configured_section_create(:node)
+ end
+ assert_raise(Fluent::ConfigError.new("'text' parameter is required")) do
+ obj.configured_section_create(:description1)
+ end
+ end
+
+ test 'creates any defined section with default values if name is nil and config is not specified' do
+ obj = ConfigurableSpec::Base4Safe.new
+ node = obj.configured_section_create(:node)
+ assert_equal 0, node.num
+ assert_equal "node", node.name
+ assert_equal "b4", node.type
+
+ desc1 = obj.configured_section_create(:description1)
+ assert_equal "desc1", desc1.note
+ assert_equal "teeeext", desc1.text
+ end
+
+ test 'creates any defined section with default values if name is nil and config is empty element' do
+ obj = ConfigurableSpec::Base4Safe.new
+ node = obj.configured_section_create(:node, config_element())
+ assert_equal 0, node.num
+ assert_equal "node", node.name
+ assert_equal "b4", node.type
+
+ desc1 = obj.configured_section_create(:description1, config_element())
+ assert_equal "desc1", desc1.note
+ assert_equal "teeeext", desc1.text
+ end
+
+ test 'creates any defined section with specified value if name is nil and configuration element is specified' do
+ obj = ConfigurableSpec::Base4Safe.new
+ node = obj.configured_section_create(:node, config_element('node', '1', {'name' => 'node1', 'type' => 'b1'}))
+ assert_equal 1, node.num
+ assert_equal "node1", node.name
+ assert_equal "b1", node.type
+
+ desc1 = obj.configured_section_create(:description1, config_element('description1', 'desc one', {'text' => 't'}))
+ assert_equal "desc one", desc1.note
+ assert_equal "t", desc1.text
+ end
+
+ test 'creates a defined section instance even if it is defined as multi:true' do
+ obj = ConfigurableSpec::Base4Safe.new
+ desc3 = obj.configured_section_create(:description3)
+ assert_equal "desc3", desc3.note
+ assert_equal "yay", desc3.text
+
+ desc3 = obj.configured_section_create(:description3, config_element('description3', 'foo'))
+ assert_equal "desc3: foo", desc3.note
+ assert_equal "yay", desc3.text
+ end
+ end
+
sub_test_case '#configure' do
BASE_ATTRS = {
"name1" => "1", "name2" => "2", "name3" => "3",
diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb
index 3d0082b1e6..f31c3c4508 100644
--- a/test/plugin/test_out_forward.rb
+++ b/test/plugin/test_out_forward.rb
@@ -72,7 +72,7 @@ def read_ack_from_sock(sock, unpacker)
])
nodes = d.instance.nodes
assert_equal 60, d.instance.send_timeout
- assert_equal :tcp, d.instance.heartbeat_type
+ assert_equal :transport, d.instance.heartbeat_type
assert_equal 1, nodes.length
node = nodes.first
assert_equal "test", node.name
@@ -119,8 +119,8 @@ def read_ack_from_sock(sock, unpacker)
end
end
- test 'configure_dns_round_robin tcp' do
- @d = d = create_driver(CONFIG + "\nheartbeat_type tcp\ndns_round_robin true")
+ test 'configure_dns_round_robin transport' do
+ @d = d = create_driver(CONFIG + "\nheartbeat_type transport\ndns_round_robin true")
assert_equal true, d.instance.dns_round_robin
end
@@ -655,7 +655,8 @@ def create_target_input_driver(response_stub: nil, disconnect: false, conf: TARG
usock = d.instance.instance_variable_get(:@usock)
servers = d.instance.instance_variable_get(:@_servers)
timers = d.instance.instance_variable_get(:@_timers)
- assert_equal UDPSocket, usock.class
+ assert_equal Fluent::PluginHelper::Socket::WrappedSocket::UDP, usock.class
+ assert_kind_of UDPSocket, usock
assert servers.find{|s| s.title == :out_forward_heartbeat_receiver }
assert timers.include?(:out_forward_heartbeat_request)
diff --git a/test/plugin_helper/test_server.rb b/test/plugin_helper/test_server.rb
index ae906de967..18a4713105 100644
--- a/test/plugin_helper/test_server.rb
+++ b/test/plugin_helper/test_server.rb
@@ -1,5 +1,6 @@
require_relative '../helper'
require 'fluent/plugin_helper/server'
+require 'fluent/plugin_helper/cert_option' # to create certs for tests
require 'fluent/plugin/base'
require 'timeout'
@@ -11,6 +12,8 @@ class Dummy < Fluent::Plugin::TestBase
helpers :server
end
+ TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/../tmp/plugin_helper_server")
+
PORT = unused_port
setup do
@@ -141,11 +144,49 @@ class Dummy < Fluent::Plugin::TestBase
assert created_server.is_a?(Coolio::TCPServer)
end
- # tests about "proto: :udp" is in #server_create
+ data(methods)
+ test 'creates tls server in default if transport section and tcp protocol specified' do |m|
+ @d = d = Dummy.new
+ transport_conf = config_element('transport', 'tcp', {}, [])
+ d.configure(config_element('ROOT', '', {}, [transport_conf]))
+ d.start
+ d.after_start
+
+ d.__send__(m, :myserver, PORT){|x| x }
+
+ created_server_info = @d._servers.first
+ assert_equal :tcp, created_server_info.proto
+ created_server = created_server_info.server
+ assert created_server.is_a?(Coolio::TCPServer)
+ end
data(methods)
test 'creates tls server if specified in proto' do |m|
- # pend "not implemented yet"
+ assert_raise(ArgumentError.new("BUG: TLS transport specified, but certification options are not specified")) do
+ @d.__send__(m, :myserver, PORT, proto: :tls){|x| x }
+ end
+ @d.__send__(m, :myserver, PORT, proto: :tls, tls_options: {insecure: true}){|x| x }
+
+ created_server_info = @d._servers.first
+ assert_equal :tls, created_server_info.proto
+ created_server = created_server_info.server
+ assert created_server.is_a?(Coolio::TCPServer) # yes, TCP here
+ end
+
+ data(methods)
+ test 'creates tls server in default if transport section and tls protocol specified' do |m|
+ @d = d = Dummy.new
+ transport_conf = config_element('transport', 'tls', {'insecure' => 'true'}, [])
+ d.configure(config_element('ROOT', '', {}, [transport_conf]))
+ d.start
+ d.after_start
+
+ d.__send__(m, :myserver, PORT){|x| x }
+
+ created_server_info = @d._servers.first
+ assert_equal :tls, created_server_info.proto
+ created_server = created_server_info.server
+ assert created_server.is_a?(Coolio::TCPServer) # OK, it's Coolio::TCPServer
end
data(methods)
@@ -162,10 +203,10 @@ class Dummy < Fluent::Plugin::TestBase
data(
'server_create tcp' => [:server_create, :tcp],
- # 'server_create tls' => [:server_create, :tls],
+ 'server_create tls' => [:server_create, :tls],
# 'server_create unix' => [:server_create, :unix],
'server_create_connection tcp' => [:server_create_connection, :tcp],
- # 'server_create_connection tcp' => [:server_create_connection, :tls],
+ 'server_create_connection tls' => [:server_create_connection, :tls],
# 'server_create_connection tcp' => [:server_create_connection, :unix],
)
test 'raise error if udp options specified for tcp/tls/unix' do |(m, proto)|
@@ -203,17 +244,17 @@ class Dummy < Fluent::Plugin::TestBase
# 'server_create_connection unix' => [:server_create_connection, :unix, {}],
)
test 'raise error if tls options specified for tcp/udp/unix' do |(m, proto, kwargs)|
- assert_raise(ArgumentError.new("BUG: certopts is available only for tls")) do
- @d.__send__(m, :myserver, PORT, proto: proto, certopts: {}, **kwargs){|x| x }
+ assert_raise(ArgumentError.new("BUG: tls_options is available only for tls")) do
+ @d.__send__(m, :myserver, PORT, proto: proto, tls_options: {}, **kwargs){|x| x }
end
end
data(
'server_create tcp' => [:server_create, :tcp, {}],
'server_create udp' => [:server_create, :udp, {max_bytes: 128}],
- # 'server_create tls' => [:server_create, :tls, {}],
+ 'server_create tls' => [:server_create, :tls, {tls_options: {insecure: true}}],
'server_create_connection tcp' => [:server_create_connection, :tcp, {}],
- # 'server_create_connection tls' => [:server_create_connection, :tls, {}],
+ 'server_create_connection tls' => [:server_create_connection, :tls, {tls_options: {insecure: true}}],
)
test 'can bind specified IPv4 address' do |(m, proto, kwargs)|
@d.__send__(m, :myserver, PORT, proto: proto, bind: "127.0.0.1", **kwargs){|x| x }
@@ -224,9 +265,9 @@ class Dummy < Fluent::Plugin::TestBase
data(
'server_create tcp' => [:server_create, :tcp, {}],
'server_create udp' => [:server_create, :udp, {max_bytes: 128}],
- # 'server_create tls' => [:server_create, :tls, {}],
+ 'server_create tls' => [:server_create, :tls, {tls_options: {insecure: true}}],
'server_create_connection tcp' => [:server_create_connection, :tcp, {}],
- # 'server_create_connection tls' => [:server_create_connection, :tls, {}],
+ 'server_create_connection tls' => [:server_create_connection, :tls, {tls_options: {insecure: true}}],
)
test 'can bind specified IPv6 address' do |(m, proto, kwargs)| # if available
omit "IPv6 unavailable here" unless ipv6_enabled?
@@ -238,10 +279,10 @@ class Dummy < Fluent::Plugin::TestBase
data(
'server_create tcp' => [:server_create, :tcp, {}],
'server_create udp' => [:server_create, :udp, {max_bytes: 128}],
- # 'server_create tls' => [:server_create, :tls, {}],
+ 'server_create tls' => [:server_create, :tls, {tls_options: {insecure: true}}],
# 'server_create unix' => [:server_create, :unix, {}],
'server_create_connection tcp' => [:server_create, :tcp, {}],
- # 'server_create_connection tls' => [:server_create, :tls, {}],
+ 'server_create_connection tls' => [:server_create, :tls, {tls_options: {insecure: true}}],
# 'server_create_connection unix' => [:server_create, :unix, {}],
)
test 'can create 2 or more servers which share same bind address and port if shared option is true' do |(m, proto, kwargs)|
@@ -260,10 +301,10 @@ class Dummy < Fluent::Plugin::TestBase
data(
'server_create tcp' => [:server_create, :tcp, {}],
'server_create udp' => [:server_create, :udp, {max_bytes: 128}],
- # 'server_create tls' => [:server_create, :tls, {}],
+ 'server_create tls' => [:server_create, :tls, {tls_options: {insecure: true}}],
# 'server_create unix' => [:server_create, :unix, {}],
'server_create_connection tcp' => [:server_create, :tcp, {}],
- # 'server_create_connection tls' => [:server_create, :tls, {}],
+ 'server_create_connection tls' => [:server_create, :tls, {tls_options: {insecure: true}}],
# 'server_create_connection unix' => [:server_create, :unix, {}],
)
test 'cannot create 2 or more servers using same bind address and port if shared option is false' do |(m, proto, kwargs)|
@@ -286,7 +327,7 @@ class Dummy < Fluent::Plugin::TestBase
data(
'tcp' => [:tcp, {}],
'udp' => [:udp, {max_bytes: 128}],
- # 'tls' => [:tls, {}],
+ 'tls' => [:tls, {tls_options: {insecure: true}}],
# 'unix' => [:unix, {}],
)
test 'raise error if block argument is not specified or too many' do |(proto, kwargs)|
@@ -428,7 +469,7 @@ class Dummy < Fluent::Plugin::TestBase
end
waiting(10){ sleep 0.1 until received.bytesize == 4 || errors.size == 1 }
assert_equal "foo\n", received
- assert_equal 1, errors.size
+ assert{ errors.size > 0 } # it might be called twice (or more) when connection was accepted, and then data arrived (or more)
assert_equal "data callback can be registered just once, but registered twice", errors.first.message
end
@@ -696,25 +737,658 @@ class Dummy < Fluent::Plugin::TestBase
end
end
+ module CertUtil
+ extend Fluent::PluginHelper::CertOption
+ end
+
+ def create_ca_options
+ {
+ private_key_length: 2048,
+ country: 'US',
+ state: 'CA',
+ locality: 'Mountain View',
+ common_name: 'ca.testing.fluentd.org',
+ expiration: 30 * 86400,
+ digest: :sha256,
+ }
+ end
+
+ def create_server_options
+ {
+ private_key_length: 2048,
+ country: 'US',
+ state: 'CA',
+ locality: 'Mountain View',
+ common_name: 'server.testing.fluentd.org',
+ expiration: 30 * 86400,
+ digest: :sha256,
+ }
+ end
+
+ def write_cert_and_key(cert_path, cert, key_path, key, passphrase)
+ File.open(cert_path, "w"){|f| f.write(cert.to_pem) }
+ # Encrypt secret key by AES256, and write it in PEM format
+ File.open(key_path, "w"){|f| f.write(key.export(OpenSSL::Cipher.new("AES-256-CBC"), passphrase)) }
+ File.chmod(0600, cert_path, key_path)
+ end
+
+ def create_server_pair_signed_by_self(cert_path, private_key_path, passphrase)
+ cert, key, _ = CertUtil.cert_option_generate_server_pair_self_signed(create_server_options)
+ write_cert_and_key(cert_path, cert, private_key_path, key, passphrase)
+ end
+
+ def create_ca_pair_signed_by_self(cert_path, private_key_path, passphrase)
+ cert, key, _ = CertUtil.cert_option_generate_ca_pair_self_signed(create_ca_options)
+ write_cert_and_key(cert_path, cert, private_key_path, key, passphrase)
+ end
+
+ def create_server_pair_signed_by_ca(ca_cert_path, ca_key_path, ca_key_passphrase, cert_path, private_key_path, passphrase)
+ cert, key, _ = CertUtil.cert_option_generate_server_pair_by_ca(ca_cert_path, ca_key_path, ca_key_passphrase, create_server_options)
+ write_cert_and_key(cert_path, cert, private_key_path, key, passphrase)
+ end
+
+ def create_server_pair_chained_with_root_ca(ca_cert_path, ca_key_path, ca_key_passphrase, cert_path, private_key_path, passphrase)
+ root_cert, root_key, _ = CertUtil.cert_option_generate_ca_pair_self_signed(create_ca_options)
+ write_cert_and_key(ca_cert_path, root_cert, ca_key_path, root_key, ca_key_passphrase)
+
+ intermediate_ca_options = create_ca_options
+ intermediate_ca_options[:common_name] = 'ca2.testing.fluentd.org'
+ chain_cert, chain_key = CertUtil.cert_option_generate_pair(intermediate_ca_options, root_cert.subject)
+ chain_cert.add_extension OpenSSL::X509::Extension.new('basicConstraints', OpenSSL::ASN1.Sequence([OpenSSL::ASN1::Boolean(true)]))
+ chain_cert.sign(root_key, "sha256")
+
+ server_cert, server_key, _ = CertUtil.cert_option_generate_pair(create_server_options, chain_cert.subject)
+ server_cert.add_extension OpenSSL::X509::Extension.new('basicConstraints', OpenSSL::ASN1.Sequence([OpenSSL::ASN1::Boolean(false)]))
+ server_cert.add_extension OpenSSL::X509::Extension.new('nsCertType', 'server')
+ server_cert.sign(chain_key, "sha256")
+
+ # write chained cert
+ File.open(cert_path, "w") do |f|
+ f.write server_cert.to_pem
+ f.write chain_cert.to_pem
+ end
+ File.open(private_key_path, "w"){|f| f.write(server_key.export(OpenSSL::Cipher.new("AES-256-CBC"), passphrase)) }
+ File.chmod(0600, cert_path, private_key_path)
+ end
+
+ def open_tls_session(addr, port, verify: true, cert_path: nil, selfsigned: true, hostname: nil)
+ context = OpenSSL::SSL::SSLContext.new
+ context.set_params({})
+ if verify
+ cert_store = OpenSSL::X509::Store.new
+ cert_store.set_default_paths
+ if selfsigned && OpenSSL::X509.const_defined?('V_FLAG_CHECK_SS_SIGNATURE')
+ cert_store.flags = OpenSSL::X509::V_FLAG_CHECK_SS_SIGNATURE
+ end
+ if cert_path
+ cert_store.add_file(cert_path)
+ end
+ context.verify_mode = OpenSSL::SSL::VERIFY_PEER
+ context.cert_store = cert_store
+ if !hostname && context.respond_to?(:verify_hostname=)
+ context.verify_hostname = false # In test code, using hostname to be connected is very difficult
+ end
+ else
+ context.verify_mode = OpenSSL::SSL::VERIFY_NONE
+ end
+
+ sock = OpenSSL::SSL::SSLSocket.new(TCPSocket.new(addr, port), context)
+ sock.hostname = hostname if hostname && sock.respond_to?(:hostname)
+ sock.connect
+ yield sock
+ ensure
+ sock.close rescue nil
+ end
+
+ sub_test_case '#server_create_tls with various certificate options' do
+ setup do
+ @d = Dummy.new # to get plugin not configured/started yet
+
+ @certs_dir = File.join(TMP_DIR, "tls_certs")
+ @server_cert_dir = File.join(@certs_dir, "server")
+ FileUtils.rm_rf @certs_dir
+ FileUtils.mkdir_p @server_cert_dir
+ end
+
+ sub_test_case 'using tls_options arguments to specify cert options' do
+ setup do
+ @d.configure(config_element()); @d.start; @d.after_start
+ end
+
+ test 'create dynamic self-signed cert/key pair (without any verification from clients)' do
+ # insecure
+ tls_options = {
+ protocol: :tls,
+ version: 'TLSv1_2',
+ ciphers: 'ALL:!aNULL:!eNULL:!SSLv2',
+ insecure: true,
+ generate_private_key_length: 2048,
+ generate_cert_country: 'US',
+ generate_cert_state: 'CA',
+ generate_cert_locality: 'Mountain View',
+ generate_cert_common_name: 'myserver.testing.fluentd.org',
+ generate_cert_expiration: 10 * 365 * 86400,
+ generate_cert_digest: :sha256,
+ }
+
+ received = ""
+ @d.server_create_tls(:s, PORT, tls_options: tls_options) do |data, conn|
+ received << data
+ end
+ assert_raise "" do
+ open_tls_session('127.0.0.1', PORT) do |sock|
+ sock.post_connection_check('myserver.testing.fluentd.org')
+ # cannot connect ....
+ end
+ end
+ open_tls_session('127.0.0.1', PORT, verify: false) do |sock|
+ sock.puts "yay"
+ sock.puts "foo"
+ end
+ waiting(10){ sleep 0.1 until received.bytesize == 8 }
+ assert_equal "yay\nfoo\n", received
+ end
+
+ test 'load self-signed cert/key pair (files), verified from clients using cert files' do
+ cert_path = File.join(@server_cert_dir, "cert.pem")
+ private_key_path = File.join(@certs_dir, "server.key.pem")
+ private_key_passphrase = "yaaaaaaaaaaaaaaaaaaay"
+ create_server_pair_signed_by_self(cert_path, private_key_path, private_key_passphrase)
+
+ tls_options = {
+ protocol: :tls,
+ version: 'TLSv1_2',
+ ciphers: 'ALL:!aNULL:!eNULL:!SSLv2',
+ insecure: false,
+ cert_path: cert_path,
+ private_key_path: private_key_path,
+ private_key_passphrase: private_key_passphrase,
+ }
+ received = ""
+ @d.server_create_tls(:s, PORT, tls_options: tls_options) do |data, conn|
+ received << data
+ end
+ assert_raise "" do
+ open_tls_session('127.0.0.1', PORT) do |sock|
+ sock.post_connection_check('server.testing.fluentd.org')
+ # cannot connect by failing verification without server cert
+ end
+ end
+ open_tls_session('127.0.0.1', PORT, cert_path: cert_path) do |sock|
+ sock.puts "yay"
+ sock.puts "foo"
+ end
+ waiting(10){ sleep 0.1 until received.bytesize == 8 }
+ assert_equal "yay\nfoo\n", received
+ end
+
+ test 'create dynamic server cert by private CA cert file, verified from clients using CA cert file' do
+ ca_cert_path = File.join(@certs_dir, "ca_cert.pem")
+ ca_key_path = File.join(@certs_dir, "ca.key.pem")
+ ca_key_passphrase = "fooooooooooooooooooooooooo"
+ create_ca_pair_signed_by_self(ca_cert_path, ca_key_path, ca_key_passphrase)
+
+ tls_options = {
+ protocol: :tls,
+ version: 'TLSv1_2',
+ ciphers: 'ALL:!aNULL:!eNULL:!SSLv2',
+ insecure: false,
+ ca_cert_path: ca_cert_path,
+ ca_private_key_path: ca_key_path,
+ ca_private_key_passphrase: ca_key_passphrase,
+ generate_private_key_length: 2048,
+ }
+ received = ""
+ @d.server_create_tls(:s, PORT, tls_options: tls_options) do |data, conn|
+ received << data
+ end
+ open_tls_session('127.0.0.1', PORT, cert_path: ca_cert_path) do |sock|
+ sock.puts "yay"
+ sock.puts "foo"
+ end
+ waiting(10){ sleep 0.1 until received.bytesize == 8 }
+ assert_equal "yay\nfoo\n", received
+ end
+
+ test 'load static server cert by private CA cert file, verified from clients using CA cert file' do
+ ca_cert_path = File.join(@certs_dir, "ca_cert.pem")
+ ca_key_path = File.join(@certs_dir, "ca.key.pem")
+ ca_key_passphrase = "foooooooo"
+ create_ca_pair_signed_by_self(ca_cert_path, ca_key_path, ca_key_passphrase)
+
+ cert_path = File.join(@server_cert_dir, "cert.pem")
+ private_key_path = File.join(@certs_dir, "server.key.pem")
+ private_key_passphrase = "yaaaaaaaaaaaaaaaaaaay"
+ create_server_pair_signed_by_ca(ca_cert_path, ca_key_path, ca_key_passphrase, cert_path, private_key_path, private_key_passphrase)
+
+ tls_options = {
+ protocol: :tls,
+ version: 'TLSv1_2',
+ ciphers: 'ALL:!aNULL:!eNULL:!SSLv2',
+ insecure: false,
+ cert_path: cert_path,
+ private_key_path: private_key_path,
+ private_key_passphrase: private_key_passphrase,
+ }
+ received = ""
+ @d.server_create_tls(:s, PORT, tls_options: tls_options) do |data, conn|
+ received << data
+ end
+ open_tls_session('127.0.0.1', PORT, cert_path: ca_cert_path) do |sock|
+ sock.puts "yay"
+ sock.puts "foo"
+ end
+ waiting(10){ sleep 0.1 until received.bytesize == 8 }
+ assert_equal "yay\nfoo\n", received
+ end
+
+ test 'load chained server cert by private CA cert file, verified from clients using CA cert file as root' do
+ ca_cert_path = File.join(@certs_dir, "ca_cert.pem")
+ ca_key_path = File.join(@certs_dir, "ca.key.pem")
+ ca_key_passphrase = "foooooooo"
+ cert_path = File.join(@server_cert_dir, "cert.pem")
+ private_key_path = File.join(@certs_dir, "server.key.pem")
+ private_key_passphrase = "yaaaaaaaaaaaaaaaaaaay"
+ create_server_pair_chained_with_root_ca(ca_cert_path, ca_key_path, ca_key_passphrase, cert_path, private_key_path, private_key_passphrase)
+
+ tls_options = {
+ protocol: :tls,
+ version: 'TLSv1_2',
+ ciphers: 'ALL:!aNULL:!eNULL:!SSLv2',
+ insecure: false,
+ cert_path: cert_path,
+ private_key_path: private_key_path,
+ private_key_passphrase: private_key_passphrase,
+ }
+ received = ""
+ @d.server_create_tls(:s, PORT, tls_options: tls_options) do |data, conn|
+ received << data
+ end
+ open_tls_session('127.0.0.1', PORT, cert_path: ca_cert_path) do |sock|
+ sock.puts "yay"
+ sock.puts "foo"
+ end
+ waiting(10){ sleep 0.1 until received.bytesize == 8 }
+ assert_equal "yay\nfoo\n", received
+ end
+ end
+
+ sub_test_case 'using configurations to specify cert options' do
+ test 'create dynamic self-signed cert/key pair (without any verification from clients)' do
+ # insecure
+ transport_opts = {
+ 'insecure' => 'true',
+ }
+ transport_conf = config_element('transport', 'tls', transport_opts)
+ conf = config_element('match', 'tag.*', {}, [transport_conf])
+
+ @d.configure(conf); @d.start; @d.after_start
+
+ received = ""
+ @d.server_create_tls(:s, PORT) do |data, conn|
+ received << data
+ end
+ assert_raise "" do
+ open_tls_session('127.0.0.1', PORT) do |sock|
+ sock.post_connection_check('myserver.testing.fluentd.org')
+ # cannot connect ....
+ end
+ end
+ open_tls_session('127.0.0.1', PORT, verify: false) do |sock|
+ sock.puts "yay"
+ sock.puts "foo"
+ end
+ waiting(10){ sleep 0.1 until received.bytesize == 8 }
+ assert_equal "yay\nfoo\n", received
+ end
+
+ test 'load self-signed cert/key pair (files), verified from clients using cert files' do
+ cert_path = File.join(@server_cert_dir, "cert.pem")
+ private_key_path = File.join(@certs_dir, "server.key.pem")
+ private_key_passphrase = "yaaaaaaaaaaaaaaaaaaay"
+ create_server_pair_signed_by_self(cert_path, private_key_path, private_key_passphrase)
+
+ transport_opts = {
+ 'cert_path' => cert_path,
+ 'private_key_path' => private_key_path,
+ 'private_key_passphrase' => private_key_passphrase,
+ }
+ transport_conf = config_element('transport', 'tls', transport_opts)
+ conf = config_element('match', 'tag.*', {}, [transport_conf])
+
+ @d.configure(conf); @d.start; @d.after_start
+
+ received = ""
+ @d.server_create_tls(:s, PORT) do |data, conn|
+ received << data
+ end
+ assert_raise "" do
+ open_tls_session('127.0.0.1', PORT) do |sock|
+ sock.post_connection_check('server.testing.fluentd.org')
+ # cannot connect by failing verification without server cert
+ end
+ end
+ open_tls_session('127.0.0.1', PORT, cert_path: cert_path) do |sock|
+ sock.puts "yay"
+ sock.puts "foo"
+ end
+ waiting(10){ sleep 0.1 until received.bytesize == 8 }
+ assert_equal "yay\nfoo\n", received
+ end
+
+ test 'create dynamic server cert by private CA cert file, verified from clients using CA cert file' do
+ ca_cert_path = File.join(@certs_dir, "ca_cert.pem")
+ ca_key_path = File.join(@certs_dir, "ca.key.pem")
+ ca_key_passphrase = "fooooooooooooooooooooooooo"
+ create_ca_pair_signed_by_self(ca_cert_path, ca_key_path, ca_key_passphrase)
+
+ transport_opts = {
+ 'ca_cert_path' => ca_cert_path,
+ 'ca_private_key_path' => ca_key_path,
+ 'ca_private_key_passphrase' => ca_key_passphrase,
+ }
+ transport_conf = config_element('transport', 'tls', transport_opts)
+ conf = config_element('match', 'tag.*', {}, [transport_conf])
+
+ @d.configure(conf); @d.start; @d.after_start
+
+ received = ""
+ @d.server_create_tls(:s, PORT) do |data, conn|
+ received << data
+ end
+ open_tls_session('127.0.0.1', PORT, cert_path: ca_cert_path) do |sock|
+ sock.puts "yay"
+ sock.puts "foo"
+ end
+ waiting(10){ sleep 0.1 until received.bytesize == 8 }
+ assert_equal "yay\nfoo\n", received
+ end
+
+ test 'load static server cert by private CA cert file, verified from clients using CA cert file' do
+ ca_cert_path = File.join(@certs_dir, "ca_cert.pem")
+ ca_key_path = File.join(@certs_dir, "ca.key.pem")
+ ca_key_passphrase = "foooooooo"
+ create_ca_pair_signed_by_self(ca_cert_path, ca_key_path, ca_key_passphrase)
+
+ cert_path = File.join(@server_cert_dir, "cert.pem")
+ private_key_path = File.join(@certs_dir, "server.key.pem")
+ private_key_passphrase = "yaaaaaaaaaaaaaaaaaaay"
+ create_server_pair_signed_by_ca(ca_cert_path, ca_key_path, ca_key_passphrase, cert_path, private_key_path, private_key_passphrase)
+
+ transport_opts = {
+ 'cert_path' => cert_path,
+ 'private_key_path' => private_key_path,
+ 'private_key_passphrase' => private_key_passphrase,
+ }
+ transport_conf = config_element('transport', 'tls', transport_opts)
+ conf = config_element('match', 'tag.*', {}, [transport_conf])
+
+ @d.configure(conf); @d.start; @d.after_start
+
+ received = ""
+ @d.server_create_tls(:s, PORT) do |data, conn|
+ received << data
+ end
+ open_tls_session('127.0.0.1', PORT, cert_path: ca_cert_path) do |sock|
+ sock.puts "yay"
+ sock.puts "foo"
+ end
+ waiting(10){ sleep 0.1 until received.bytesize == 8 }
+ assert_equal "yay\nfoo\n", received
+ end
+
+ test 'load chained server cert by private CA cert file, verified from clients using CA cert file as root' do
+ ca_cert_path = File.join(@certs_dir, "ca_cert.pem")
+ ca_key_path = File.join(@certs_dir, "ca.key.pem")
+ ca_key_passphrase = "foooooooo"
+ cert_path = File.join(@server_cert_dir, "cert.pem")
+ private_key_path = File.join(@certs_dir, "server.key.pem")
+ private_key_passphrase = "yaaaaaaaaaaaaaaaaaaay"
+ create_server_pair_chained_with_root_ca(ca_cert_path, ca_key_path, ca_key_passphrase, cert_path, private_key_path, private_key_passphrase)
+
+ transport_opts = {
+ 'cert_path' => cert_path,
+ 'private_key_path' => private_key_path,
+ 'private_key_passphrase' => private_key_passphrase,
+ }
+ transport_conf = config_element('transport', 'tls', transport_opts)
+ conf = config_element('match', 'tag.*', {}, [transport_conf])
+
+ @d.configure(conf); @d.start; @d.after_start
+
+ received = ""
+ @d.server_create_tls(:s, PORT) do |data, conn|
+ received << data
+ end
+ open_tls_session('127.0.0.1', PORT, cert_path: ca_cert_path) do |sock|
+ sock.puts "yay"
+ sock.puts "foo"
+ end
+ waiting(10){ sleep 0.1 until received.bytesize == 8 }
+ assert_equal "yay\nfoo\n", received
+ end
+ end
+ end
+
sub_test_case '#server_create_tls' do
- # not implemented yet
+ setup do
+ @certs_dir = File.join(TMP_DIR, "tls_certs")
+ FileUtils.rm_rf @certs_dir
+ FileUtils.mkdir_p @certs_dir
+
+ @server_cert_dir = File.join(@certs_dir, "server")
+ FileUtils.mkdir_p @server_cert_dir
+
+ @cert_path = File.join(@server_cert_dir, "cert.pem")
+ private_key_path = File.join(@certs_dir, "server.key.pem")
+ private_key_passphrase = "yaaaaaaaaaaaaaaaaaaay"
+ create_server_pair_signed_by_self(@cert_path, private_key_path, private_key_passphrase)
+
+ @default_hostname = ::Socket.gethostname
+
+ @tls_options = {
+ protocol: :tls,
+ version: 'TLSv1_2',
+ ciphers: 'ALL:!aNULL:!eNULL:!SSLv2',
+ insecure: false,
+ cert_path: @cert_path,
+ private_key_path: private_key_path,
+ private_key_passphrase: private_key_passphrase,
+ }
+ end
+
+ test 'can accept all keyword arguments valid for tcp/tls server' do
+ assert_nothing_raised do
+ @d.server_create_tls(:s, PORT, bind: '127.0.0.1', shared: false, resolve_name: true, linger_timeout: 10, backlog: 500, tls_options: @tls_options) do |data, conn|
+ # ...
+ end
+ end
+ end
+
+ test 'creates a tls server just to read data' do
+ received = ""
+ @d.server_create_tls(:s, PORT, tls_options: @tls_options) do |data, conn|
+ received << data
+ end
+ 3.times do
+ open_tls_session('127.0.0.1', PORT, cert_path: @cert_path) do |sock|
+ sock.puts "yay"
+ sock.puts "foo"
+ end
+ end
+ waiting(10){ sleep 0.1 until received.bytesize == 24 }
+ assert_equal 3, received.scan("yay\n").size
+ assert_equal 3, received.scan("foo\n").size
+ end
+
+ test 'creates a tls server to read and write data' do
+ received = ""
+ responses = []
+ @d.server_create_tls(:s, PORT, tls_options: @tls_options) do |data, conn|
+ received << data
+ conn.write "ack\n"
+ end
+ 3.times do
+ # open_tls_session('127.0.0.1', PORT, cert_path: @cert_path, hostname: @default_hostname) do |sock|
+ open_tls_session('127.0.0.1', PORT, cert_path: @cert_path) do |sock|
+ sock.puts "yay"
+ sock.puts "foo"
+ responses << sock.readline
+ end
+ end
+ waiting(10){ sleep 0.1 until received.bytesize == 24 }
+ assert_equal 3, received.scan("yay\n").size
+ assert_equal 3, received.scan("foo\n").size
+ assert_equal ["ack\n","ack\n","ack\n"], responses
+ end
+
+ test 'creates a tls server to read and write data using IPv6' do
+ omit "IPv6 unavailable here" unless ipv6_enabled?
+
+ received = ""
+ responses = []
+ @d.server_create_tls(:s, PORT, bind: "::1", tls_options: @tls_options) do |data, conn|
+ received << data
+ conn.write "ack\n"
+ end
+ 3.times do
+ # open_tls_session('::1', PORT, cert_path: @cert_path, hostname: @default_hostname) do |sock|
+ open_tls_session('::1', PORT, cert_path: @cert_path) do |sock|
+ sock.puts "yay"
+ sock.puts "foo"
+ responses << sock.readline
+ end
+ end
+ waiting(10){ sleep 0.1 until received.bytesize == 24 }
+ assert_equal 3, received.scan("yay\n").size
+ assert_equal 3, received.scan("foo\n").size
+ assert_equal ["ack\n","ack\n","ack\n"], responses
+ end
+
+ test 'does not resolve name of client address in default' do
+ received = ""
+ sources = []
+ @d.server_create_tls(:s, PORT, tls_options: @tls_options) do |data, conn|
+ received << data
+ sources << conn.remote_host
+ end
+ 3.times do
+ # open_tls_session('127.0.0.1', PORT, cert_path: @cert_path, hostname: @default_hostname) do |sock|
+ open_tls_session('127.0.0.1', PORT, cert_path: @cert_path) do |sock|
+ sock.puts "yay"
+ end
+ end
+ waiting(10){ sleep 0.1 until received.bytesize == 12 }
+ assert_equal 3, received.scan("yay\n").size
+ assert{ sources.all?{|s| s == "127.0.0.1" } }
+ end
- # test 'can accept all keyword arguments valid for tcp/tls server'
- # test 'creates a tls server just to read data'
- # test 'creates a tls server to read and write data'
- # test 'creates a tls server to read and write data using IPv6'
+ test 'does resolve name of client address if resolve_name is true' do
+ hostname = Socket.getnameinfo([nil, nil, nil, "127.0.0.1"])[0]
- # many tests about certops
+ received = ""
+ sources = []
+ @d.server_create_tls(:s, PORT, resolve_name: true, tls_options: @tls_options) do |data, conn|
+ received << data
+ sources << conn.remote_host
+ end
+ 3.times do
+ # open_tls_session('127.0.0.1', PORT, cert_path: @cert_path, hostname: @default_hostname) do |sock|
+ open_tls_session('127.0.0.1', PORT, cert_path: @cert_path) do |sock|
+ sock.puts "yay"
+ end
+ end
+ waiting(10){ sleep 0.1 until received.bytesize == 12 }
+ assert_equal 3, received.scan("yay\n").size
+ assert{ sources.all?{|s| s == hostname } }
+ end
- # test 'does not resolve name of client address in default'
- # test 'does resolve name of client address if resolve_name is true'
- # test 'can keep connections alive for tls if keepalive specified' do
- # pend "not implemented yet"
- # end
+ test 'can keep connections alive for tls if keepalive specified' do
+ # pend "not implemented yet"
+ end
- # test 'raises error if plugin registers data callback for connection object from #server_create'
- # test 'can call write_complete callback if registered'
- # test 'can call close callback if registered'
+ test 'raises error if plugin registers data callback for connection object from #server_create' do
+ received = ""
+ errors = []
+ @d.server_create_tls(:s, PORT, tls_options: @tls_options) do |data, conn|
+ received << data
+ begin
+ conn.data{|d| received << d.upcase }
+ rescue => e
+ errors << e
+ end
+ end
+ open_tls_session('127.0.0.1', PORT, cert_path: @cert_path) do |sock|
+ sock.puts "foo"
+ end
+ waiting(10){ sleep 0.1 until received.bytesize == 4 || errors.size == 1 }
+ assert_equal "foo\n", received
+ assert_equal 1, errors.size
+ assert_equal "data callback can be registered just once, but registered twice", errors.first.message
+ end
+
+ test 'can call write_complete callback if registered' do
+ buffer = ""
+ lines = []
+ responses = []
+ response_completes = []
+ @d.server_create_tls(:s, PORT, tls_options: @tls_options) do |data, conn|
+ conn.on(:write_complete){|c| response_completes << true }
+ buffer << data
+ if idx = buffer.index("\n")
+ lines << buffer.slice!(0,idx+1)
+ conn.write "ack\n"
+ end
+ end
+ 3.times do
+ open_tls_session('127.0.0.1', PORT, cert_path: @cert_path) do |sock|
+ sock.write "yay"
+ sock.write "foo\n"
+ begin
+ responses << sock.readline
+ rescue EOFError, IOError, Errno::ECONNRESET
+ # ignore
+ end
+ sock.close
+ end
+ end
+ waiting(10){ sleep 0.1 until lines.size == 3 && response_completes.size == 3 }
+ assert_equal ["yayfoo\n", "yayfoo\n", "yayfoo\n"], lines
+ assert_equal ["ack\n","ack\n","ack\n"], responses
+ assert_equal [true, true, true], response_completes
+ end
+
+ test 'can call close callback if registered' do
+ buffer = ""
+ lines = []
+ callback_results = []
+ @d.server_create_tls(:s, PORT, tls_options: @tls_options) do |data, conn|
+ conn.on(:close){|c| callback_results << "closed" }
+ buffer << data
+ if idx = buffer.index("\n")
+ lines << buffer.slice!(0,idx+1)
+ conn.write "ack\n"
+ end
+ end
+ 3.times do
+ open_tls_session('127.0.0.1', PORT, cert_path: @cert_path) do |sock|
+ sock.write "yay"
+ sock.write "foo\n"
+ begin
+ while line = sock.readline
+ if line == "ack\n"
+ sock.close
+ end
+ end
+ rescue EOFError, IOError, Errno::ECONNRESET
+ # ignore
+ end
+ end
+ end
+ waiting(10){ sleep 0.1 until lines.size == 3 && callback_results.size == 3 }
+ assert_equal ["yayfoo\n", "yayfoo\n", "yayfoo\n"], lines
+ assert_equal ["closed", "closed", "closed"], callback_results
+ end
end
sub_test_case '#server_create_unix' do
@@ -729,6 +1403,22 @@ class Dummy < Fluent::Plugin::TestBase
# test 'can call close callback if registered'
end
+ def open_client(proto, addr, port)
+ client = case proto
+ when :tcp
+ TCPSocket.open(addr, port)
+ when :tls
+ c = OpenSSL::SSL::SSLSocket.new(TCPSocket.open(addr, port))
+ c.sync_close = true
+ c.connect
+ else
+ raise ArgumentError, "unknown proto:#{proto}"
+ end
+ yield client
+ ensure
+ client.close rescue nil
+ end
+
# run tests for tcp, tls and unix
sub_test_case '#server_create_connection' do
test 'raise error if udp is specified in proto' do
@@ -737,10 +1427,10 @@ class Dummy < Fluent::Plugin::TestBase
end
end
- # def server_create_connection(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, certopts: nil, resolve_name: false, linger_timeout: 0, backlog: nil, &block)
+ # def server_create_connection(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, tls_options: nil, resolve_name: false, linger_timeout: 0, backlog: nil, &block)
protocols = {
'tcp' => [:tcp, {}],
- # 'tls' => [:tls, {certopts: {}}],
+ 'tls' => [:tls, {tls_options: {insecure: true}}],
# 'unix' => [:unix, {path: ""}],
}
@@ -766,7 +1456,7 @@ class Dummy < Fluent::Plugin::TestBase
end
end
3.times do
- TCPSocket.open("127.0.0.1", PORT) do |sock|
+ open_client(proto, "127.0.0.1", PORT) do |sock|
sock.puts "yay"
end
end
@@ -788,7 +1478,7 @@ class Dummy < Fluent::Plugin::TestBase
end
end
3.times do
- TCPSocket.open("127.0.0.1", PORT) do |sock|
+ open_client(proto, "127.0.0.1", PORT) do |sock|
sock.puts "yay"
end
end
@@ -816,20 +1506,26 @@ class Dummy < Fluent::Plugin::TestBase
end
replied = []
disconnecteds = []
- 3.times do
- TCPSocket.open("127.0.0.1", PORT) do |sock|
+ 3.times do |i|
+ open_client(proto, "127.0.0.1", PORT) do |sock|
sock.puts "yay"
while line = sock.readline
replied << line
break
end
sock.write "x"
+ connection_closed = false
begin
- sock.read
+ data = sock.read
+ if data.empty?
+ connection_closed = true
+ end
rescue => e
if e.is_a?(Errno::ECONNRESET)
- disconnecteds << e.class
+ connection_closed = true
end
+ ensure
+ disconnecteds << connection_closed
end
end
end
@@ -838,7 +1534,7 @@ class Dummy < Fluent::Plugin::TestBase
waiting(10){ sleep 0.1 until disconnecteds.size == 3 }
assert_equal ["yay\n", "yay\n", "yay\n"], lines
assert_equal ["foo!\n", "foo!\n", "foo!\n"], replied
- assert_equal [Errno::ECONNRESET, Errno::ECONNRESET, Errno::ECONNRESET], disconnecteds
+ assert_equal [true, true, true], disconnecteds
end
data(protocols)
@@ -860,7 +1556,7 @@ class Dummy < Fluent::Plugin::TestBase
end
replied = []
3.times do
- TCPSocket.open("127.0.0.1", PORT) do |sock|
+ open_client(proto, "127.0.0.1", PORT) do |sock|
sock.puts "yay"
while line = sock.readline
replied << line
@@ -888,7 +1584,7 @@ class Dummy < Fluent::Plugin::TestBase
end
end
3.times do
- TCPSocket.open("127.0.0.1", PORT) do |sock|
+ open_client(proto, "127.0.0.1", PORT) do |sock|
sock.puts "yay"
end
end
@@ -901,7 +1597,7 @@ class Dummy < Fluent::Plugin::TestBase
test 'will refuse more connect requests after stop, but read data from sockets already connected, in non-shared server' do |(proto, kwargs)|
connected = false
begin
- TCPSocket.open("127.0.0.1", PORT) do |sock|
+ open_client(proto, "127.0.0.1", PORT) do |sock|
# expected behavior is connection refused...
connected = true
end
@@ -919,7 +1615,7 @@ class Dummy < Fluent::Plugin::TestBase
end
th0 = Thread.new do
- TCPSocket.open("127.0.0.1", PORT) do |sock|
+ open_client(proto, "127.0.0.1", PORT) do |sock|
sock.puts "yay"
sock.readline
end
@@ -928,14 +1624,12 @@ class Dummy < Fluent::Plugin::TestBase
value0 = waiting(5){ th0.value }
assert_equal "ack\n", value0
- # TODO: change how to create clients by proto
-
stopped = false
sleeping = false
ending = false
th1 = Thread.new do
- TCPSocket.open("127.0.0.1", PORT) do |sock|
+ open_client(proto, "127.0.0.1", PORT) do |sock|
sleeping = true
sleep 0.1 until stopped
sock.puts "yay"
@@ -958,7 +1652,7 @@ class Dummy < Fluent::Plugin::TestBase
th2 = Thread.new do
begin
- TCPSocket.open("127.0.0.1", PORT) do |sock|
+ open_client(proto, "127.0.0.1", PORT) do |sock|
sock.puts "foo"
end
false # failed