From d1ba7d868d18b00f9947f17a316f137aead04456 Mon Sep 17 00:00:00 2001 From: "Satoshi \"Moris\" Tagomori" Date: Tue, 9 Feb 2016 16:48:55 +0900 Subject: [PATCH] Merge pull request #794 from fluent/separate-system-config Separate system config object from supervisor --- lib/fluent/configurable.rb | 3 +- lib/fluent/engine.rb | 14 +++-- lib/fluent/root_agent.rb | 7 ++- lib/fluent/supervisor.rb | 18 ++---- lib/fluent/system_config.rb | 92 +++++++++++++++++++++++++++++++ lib/fluent/test/base.rb | 2 +- test/config/test_system_config.rb | 10 ++-- test/test_root_agent.rb | 7 ++- 8 files changed, 121 insertions(+), 32 deletions(-) create mode 100644 lib/fluent/system_config.rb diff --git a/lib/fluent/configurable.rb b/lib/fluent/configurable.rb index a17008ed2a..262e062690 100644 --- a/lib/fluent/configurable.rb +++ b/lib/fluent/configurable.rb @@ -19,6 +19,7 @@ module Fluent require 'fluent/config/section' require 'fluent/config/error' require 'fluent/registry' + require 'fluent/plugin' module Configurable def self.included(mod) @@ -51,7 +52,7 @@ def configure(conf) conf.corresponding_proxies << proxy # In the nested section, can't get plugin class through proxies so get plugin class here - plugin_class = Plugin.lookup_name_from_class(proxy.name.to_s) + plugin_class = Fluent::Plugin.lookup_name_from_class(proxy.name.to_s) root = Fluent::Config::SectionGenerator.generate(proxy, conf, logger, plugin_class) @config_root_section = root diff --git a/lib/fluent/engine.rb b/lib/fluent/engine.rb index 48acb7c4df..3f0d7a27cf 100644 --- a/lib/fluent/engine.rb +++ b/lib/fluent/engine.rb @@ -18,6 +18,7 @@ module Fluent require 'fluent/event_router' require 'fluent/root_agent' require 'fluent/time' + require 'fluent/system_config' class EngineClass class DummyMessagePackFactory @@ -51,8 +52,11 @@ def initialize attr_reader :root_agent attr_reader :matches, :sources attr_reader :msgpack_factory + attr_reader :system_config + + def init(system_config) + @system_config = system_config - def init(opts = {}) BasicSocket.do_not_reverse_lookup = true Plugin.load_plugins if defined?(Encoding) @@ -60,11 +64,11 @@ def init(opts = {}) Encoding.default_external = 'ASCII-8BIT' if Encoding.respond_to?(:default_external) end - suppress_interval(opts[:suppress_interval]) if opts[:suppress_interval] - @suppress_config_dump = opts[:suppress_config_dump] if opts[:suppress_config_dump] - @without_source = opts[:without_source] if opts[:without_source] + suppress_interval(system_config.emit_error_log_interval) unless system_config.emit_error_log_interval.nil? + @suppress_config_dump = system_config.suppress_config_dump unless system_config.suppress_config_dump.nil? + @without_source = system_config.without_source unless system_config.without_source.nil? - @root_agent = RootAgent.new(opts) + @root_agent = RootAgent.new(@system_config) self end diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index a0851c78b7..2445ec2ea7 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -19,6 +19,7 @@ module Fluent require 'fluent/agent' require 'fluent/label' + require 'fluent/system_config' # # Fluentd forms a tree structure to manage plugins: @@ -43,7 +44,7 @@ module Fluent class RootAgent < Agent ERROR_LABEL = "@ERROR".freeze # @ERROR is built-in error label - def initialize(opts = {}) + def initialize(system_config = SystemConfig.new) super @labels = {} @@ -52,8 +53,8 @@ def initialize(opts = {}) @suppress_emit_error_log_interval = 0 @next_emit_error_log_time = nil - suppress_interval(opts[:suppress_interval]) if opts[:suppress_interval] - @without_source = opts[:without_source] if opts[:without_source] + suppress_interval(system_config.emit_error_log_interval) unless system_config.emit_error_log_interval.nil? + @without_source = system_config.without_source unless system_config.without_source.nil? end attr_reader :inputs diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index 10299ba3ea..17f4130b52 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -15,6 +15,7 @@ # require 'fluent/load' +require 'fluent/system_config' require 'etc' module Fluent @@ -128,7 +129,8 @@ def start @log.init show_plugin_config if @show_plugin_config read_config - apply_system_config + @system_config = SystemConfig.create(@conf) # @conf is set in read_config + @system_config.apply(self) dry_run if @dry_run start_daemonize if @daemonize @@ -500,17 +502,6 @@ def apply(supervisor) end end - # TODO: this method should be moved to SystemConfig class method - def apply_system_config - systems = @conf.elements.select { |e| - e.name == 'system' - } - return if systems.empty? - raise ConfigError, " is duplicated. should be only one" if systems.size > 1 - - SystemConfig.new(systems.first).apply(self) - end - def run_configure Fluent::Engine.run_configure(@conf) end @@ -533,8 +524,7 @@ def change_privilege end def init_engine - init_opts = {:suppress_interval => @suppress_interval, :suppress_config_dump => @suppress_config_dump, :without_source => @without_source} - Fluent::Engine.init(init_opts) + Fluent::Engine.init(@system_config) @libs.each {|lib| require lib diff --git a/lib/fluent/system_config.rb b/lib/fluent/system_config.rb new file mode 100644 index 0000000000..4452e9dd86 --- /dev/null +++ b/lib/fluent/system_config.rb @@ -0,0 +1,92 @@ +# +# Fluent +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Fluent + require 'fluent/configurable' + + module SystemConfigMixin + def system_config + @_system_config || Fluent::Engine.system_config + end + + def system_config_override(opts={}) + unless @_system_config + @_system_config = Fluent::Engine.system_config.dup + end + opts.each_pair do |key, value| + @_system_config.send(:"#{key.to_s}=", value) + end + end + end + + class SystemConfig + include Configurable + + config_param :log_level, default: nil do |level| + Log.str_to_level(level) + end + config_param :suppress_repeated_stacktrace, :bool, default: nil + config_param :emit_error_log_interval, :time, default: nil + config_param :suppress_config_dump, :bool, default: nil + config_param :without_source, :bool, default: nil + config_param :rpc_endpoint, :string, default: nil + config_param :enable_get_dump, :bool, default: nil + config_param :process_name, default: nil + + def self.create(conf) + systems = conf.elements.select { |e| + e.name == 'system' + } + return SystemConfig.new if systems.empty? + raise Fluent::ConfigError, " is duplicated. should be only one" if systems.size > 1 + + SystemConfig.new(systems.first) + end + + def initialize(conf={}) + super() + configure(conf) + end + + def dup + s = SystemConfig.new + s.log_level = @log_level + s.suppress_repeated_stacktrace = @suppress_repeated_stacktrace + s.emit_error_log_interval = @emit_error_log_interval + s.suppress_config_dump = @suppress_config_dump + s.without_source = @without_source + s.rpc_endpoint = @rpc_endpoint + s.enable_get_dump = @enable_get_dump + s.process_name = @process_name + + s + end + + def apply(supervisor) + system = self + supervisor.instance_eval { + @log.level = @log_level = system.log_level unless system.log_level.nil? + @suppress_interval = system.emit_error_log_interval unless system.emit_error_log_interval.nil? + @suppress_config_dump = system.suppress_config_dump unless system.suppress_config_dump.nil? + @suppress_repeated_stacktrace = system.suppress_repeated_stacktrace unless system.suppress_repeated_stacktrace.nil? + @without_source = system.without_source unless system.without_source.nil? + @rpc_endpoint = system.rpc_endpoint unless system.rpc_endpoint.nil? + @enable_get_dump = system.enable_get_dump unless system.enable_get_dump.nil? + @process_name = system.process_name unless system.process_name.nil? + } + end + end +end diff --git a/lib/fluent/test/base.rb b/lib/fluent/test/base.rb index 8d74878739..8c40ac77ce 100644 --- a/lib/fluent/test/base.rb +++ b/lib/fluent/test/base.rb @@ -18,7 +18,7 @@ module Fluent module Test def self.setup Fluent.__send__(:remove_const, :Engine) - engine = Fluent.const_set(:Engine, EngineClass.new).init + engine = Fluent.const_set(:Engine, EngineClass.new).init(SystemConfig.new) engine.define_singleton_method(:now=) {|n| @now = n.to_i diff --git a/test/config/test_system_config.rb b/test/config/test_system_config.rb index 0775415d67..e4cfb13f01 100644 --- a/test/config/test_system_config.rb +++ b/test/config/test_system_config.rb @@ -2,7 +2,7 @@ require 'fluent/configurable' require 'fluent/config/element' require 'fluent/config/section' -require 'fluent/supervisor' +require 'fluent/system_config' module Fluent::Config class FakeLoggerInitializer @@ -36,7 +36,7 @@ def parse_text(text) EOS s = FakeSupervisor.new - sc = Fluent::Supervisor::SystemConfig.new(conf) + sc = Fluent::SystemConfig.new(conf) sc.apply(s) assert_nil(sc.log_level) assert_nil(sc.suppress_repeated_stacktrace) @@ -63,7 +63,7 @@ def parse_text(text) EOS s = FakeSupervisor.new - sc = Fluent::Supervisor::SystemConfig.new(conf) + sc = Fluent::SystemConfig.new(conf) sc.apply(s) assert_not_nil(sc.instance_variable_get("@#{k}")) key = (k == 'emit_error_log_interval' ? 'suppress_interval' : k) @@ -74,7 +74,7 @@ def parse_text(text) {'foo' => 'bar', 'hoge' => 'fuga'}.each { |k, v| test "should not affect settable parameters with unknown #{k} parameter" do s = FakeSupervisor.new - sc = Fluent::Supervisor::SystemConfig.new({k => v}) + sc = Fluent::SystemConfig.new({k => v}) sc.apply(s) assert_nil(s.instance_variable_get(:@log_level)) assert_nil(s.instance_variable_get(:@suppress_repeated_stacktrace)) @@ -91,7 +91,7 @@ def parse_text(text) EOS s = FakeSupervisor.new - sc = Fluent::Supervisor::SystemConfig.new(conf) + sc = Fluent::SystemConfig.new(conf) sc.apply(s) assert_equal(Fluent::Log::LEVEL_WARN, s.instance_variable_get("@log").level) end diff --git a/test/test_root_agent.rb b/test/test_root_agent.rb index 0b7cc784ed..50ff51b55e 100644 --- a/test/test_root_agent.rb +++ b/test/test_root_agent.rb @@ -1,4 +1,5 @@ require 'fluent/event_router' +require 'fluent/system_config' require_relative 'test_plugin_classes' class RootAgentTest < ::Test::Unit::TestCase @@ -12,12 +13,12 @@ def test_initialize end data( - 'suppress interval' => [{:suppress_interval => 30}, {:@suppress_emit_error_log_interval => 30}], - 'without source' => [{:without_source => true}, {:@without_source => true}] + 'suppress interval' => [{'emit_error_log_interval' => 30}, {:@suppress_emit_error_log_interval => 30}], + 'without source' => [{'without_source' => true}, {:@without_source => true}] ) def test_initialize_with_opt(data) opt, expected = data - ra = RootAgent.new(opt) + ra = RootAgent.new(SystemConfig.new(opt)) expected.each { |k, v| assert_equal v, ra.instance_variable_get(k) }