From ad8fda7e7fcc0195579525902bd6835683be7e9a Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Thu, 15 Dec 2016 19:28:11 +0900 Subject: [PATCH] adding root_dir system config param and plugin_root_dir method for plugins --- lib/fluent/plugin/base.rb | 4 +++ lib/fluent/plugin/buf_file.rb | 2 +- lib/fluent/plugin/storage_local.rb | 1 + lib/fluent/plugin_id.rb | 12 ++++++++ lib/fluent/supervisor.rb | 27 ++++++++++++++++-- lib/fluent/system_config.rb | 45 ++++++++++++++++-------------- 6 files changed, 67 insertions(+), 24 deletions(-) diff --git a/lib/fluent/plugin/base.rb b/lib/fluent/plugin/base.rb index 4d3d87e6f1..2c4dbd8725 100644 --- a/lib/fluent/plugin/base.rb +++ b/lib/fluent/plugin/base.rb @@ -39,6 +39,10 @@ def has_router? false end + def plugin_root_dir + nil # override this in plugin_id.rb + end + def configure(conf) super @_state ||= State.new(false, false, false, false, false, false, false, false, false) diff --git a/lib/fluent/plugin/buf_file.rb b/lib/fluent/plugin/buf_file.rb index eb547d8e29..6538276b89 100644 --- a/lib/fluent/plugin/buf_file.rb +++ b/lib/fluent/plugin/buf_file.rb @@ -32,7 +32,7 @@ class FileBuffer < Fluent::Plugin::Buffer DIR_PERMISSION = 0755 - # TODO: buffer_path based on system config + # TODO: plugin_root_dir desc 'The path where buffer chunks are stored.' config_param :path, :string diff --git a/lib/fluent/plugin/storage_local.rb b/lib/fluent/plugin/storage_local.rb index a431453f9b..03217abbf7 100644 --- a/lib/fluent/plugin/storage_local.rb +++ b/lib/fluent/plugin/storage_local.rb @@ -28,6 +28,7 @@ class LocalStorage < Storage DEFAULT_DIR_MODE = 0755 DEFAULT_FILE_MODE = 0644 + # TODO: plugin_root_dir config_param :path, :string, default: nil config_param :mode, :integer, default: DEFAULT_FILE_MODE config_param :dir_mode, :integer, default: DEFAULT_DIR_MODE diff --git a/lib/fluent/plugin_id.rb b/lib/fluent/plugin_id.rb index d48c77eb57..1e09aa4885 100644 --- a/lib/fluent/plugin_id.rb +++ b/lib/fluent/plugin_id.rb @@ -30,6 +30,7 @@ def configure(conf) end @@configured_ids.add(@id) end + @_plugin_root_dir = nil super end @@ -59,5 +60,16 @@ def plugin_id "object:#{object_id.to_s(16)}" end end + + def plugin_root_dir + return @_plugin_root_dir if @_plugin_root_dir + return nil unless system_config.root_dir + return nil unless plugin_id_configured? + worker_id = (ENV['SERVERENGINE_WORKER_ID'] || 0).to_i + dir = File.join(system_config.root_dir, "worker#{worker_id}", plugin_id) + FileUtils.mkdir_p(dir) unless Dir.exist?(dir) + @_plugin_root_dir = dir.freeze + dir + end end end diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index 5902a123e4..5af74157e2 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -16,6 +16,7 @@ require 'etc' require 'fcntl' +require 'fileutils' require 'fluent/config' require 'fluent/env' @@ -192,7 +193,10 @@ def supervisor_get_dump_config_handler module WorkerModule def spawn(process_manager) main_cmd = config[:main_cmd] - @pm = process_manager.spawn(*main_cmd) + env = { + 'SERVERENGINE_WORKER_ID' => @worker_id.to_i.to_s, + } + @pm = process_manager.spawn(env, *main_cmd) end def after_start @@ -226,6 +230,7 @@ def self.load_config(path, params = {}) fluentd_conf = Fluent::Config.parse(config_data, config_fname, config_basedir, params['use_v1_config']) system_config = SystemConfig.create(fluentd_conf) + root_dir = system_config.root_dir || params['root_dir'] log_level = system_config.log_level || params['log_level'] suppress_repeated_stacktrace = system_config.suppress_repeated_stacktrace || params['suppress_repeated_stacktrace'] log_path = params['log_path'] @@ -264,6 +269,7 @@ def self.load_config(path, params = {}) auto_heartbeat: false, unrecoverable_exit_codes: [2], stop_immediately_at_unrecoverable_exit: true, + root_dir: root_dir, logger: logger, log: logger.out, log_path: log_path, @@ -365,6 +371,7 @@ def self.default_options setup_path: nil, chuser: nil, chgroup: nil, + root_dir: nil, suppress_interval: 0, suppress_repeated_stacktrace: true, without_source: false, @@ -393,6 +400,7 @@ def initialize(opt) @rpc_server = nil @process_name = nil + @root_dir = opt[:root_dir] @log_level = opt[:log_level] @log_rotate_age = opt[:log_rotate_age] @log_rotate_size = opt[:log_rotate_size] @@ -417,6 +425,20 @@ def run_supervisor read_config set_system_config + if @root_dir + if File.exist?(@root_dir) + unless Dir.exist?(@root_dir) + raise Fluent::InvalidRootDirectory, "non directory entry exists:#{@root_dir}" + end + else + begin + FileUtils.mkdir_p(@root_dir) + rescue => e + raise Fluent::InvalidRootDirectory, "failed to create root directory:#{@root_dir}, #{e.inspect}" + end + end + end + dry_run if @dry_run supervise end @@ -426,7 +448,8 @@ def options 'config_path' => @config_path, 'pid_file' => @daemonize, 'plugin_dirs' => @plugin_dirs, - 'log_path' => @log_path + 'log_path' => @log_path, + 'root_dir' => @root_dir, } end diff --git a/lib/fluent/system_config.rb b/lib/fluent/system_config.rb index 971112c965..cf26b1a504 100644 --- a/lib/fluent/system_config.rb +++ b/lib/fluent/system_config.rb @@ -21,6 +21,14 @@ module Fluent class SystemConfig include Configurable + SYSTEM_CONFIG_PARAMETERS = [ + :root_dir, :log_level, + :suppress_repeated_stacktrace, :emit_error_log_interval, :suppress_config_dump, + :without_source, :rpc_endpoint, :enable_get_dump, :process_name, + :file_permission, :dir_permission, + ] + + config_param :root_dir, :string, default: nil config_param :log_level, default: nil do |level| Log.str_to_level(level) end @@ -68,33 +76,28 @@ def initialize(conf=nil) def dup s = SystemConfig.new - s.log_level = @log_level - s.suppress_repeated_stacktrace = @suppress_repeated_stacktrace - s.emit_error_log_interval = @emit_error_log_interval - s.suppress_config_dump = @suppress_config_dump - s.without_source = @without_source - s.rpc_endpoint = @rpc_endpoint - s.enable_get_dump = @enable_get_dump - s.process_name = @process_name - s.file_permission = @file_permission - s.dir_permission = @dir_permission - + SYSTEM_CONFIG_PARAMETERS.each do |param| + s.__send__("#{param}=", instance_variable_get("@#{param}") + end s end def apply(supervisor) system = self supervisor.instance_eval { - @log.level = @log_level = system.log_level unless system.log_level.nil? - @suppress_interval = system.emit_error_log_interval unless system.emit_error_log_interval.nil? - @suppress_config_dump = system.suppress_config_dump unless system.suppress_config_dump.nil? - @suppress_repeated_stacktrace = system.suppress_repeated_stacktrace unless system.suppress_repeated_stacktrace.nil? - @without_source = system.without_source unless system.without_source.nil? - @rpc_endpoint = system.rpc_endpoint unless system.rpc_endpoint.nil? - @enable_get_dump = system.enable_get_dump unless system.enable_get_dump.nil? - @process_name = system.process_name unless system.process_name.nil? - @file_permission = system.file_permission unless system.file_permission.nil? - @dir_permission = system.dir_permission unless system.dir_permission.nil? + SYSTEM_CONFIG_PARAMETERS.each do |param| + param_value = system.send(param) + next if param_value.nil? + + case param + when :log_level + @log.level = @log_level = param_value + when :emit_error_log_interval + @suppress_interval = param_value + else + instance_variable_set("@#{param}", param_value) + end + end } end