Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Superviser.load_config() #4086

Merged
merged 1 commit into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions lib/fluent/daemon.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,5 @@

server_module = Fluent.const_get(ARGV[0])
worker_module = Fluent.const_get(ARGV[1])
# it doesn't call ARGV in block because when reloading config, params will be initialized and then it can't use previous config.
config_path = ARGV[2]
params = JSON.parse(ARGV[3])
ServerEngine::Daemon.run_server(server_module, worker_module) { Fluent::Supervisor.load_config(config_path, params) }
params = JSON.parse(ARGV[2])
ashie marked this conversation as resolved.
Show resolved Hide resolved
ServerEngine::Daemon.run_server(server_module, worker_module) { Fluent::Supervisor.serverengine_config(params) }
1 change: 0 additions & 1 deletion lib/fluent/log.rb
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ def time_format=(time_fmt)
end

def reopen!
# do nothing in @logger.reopen! because it's already reopened in Supervisor.load_config
@logger.reopen! if @logger
nil
end
Expand Down
40 changes: 9 additions & 31 deletions lib/fluent/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -424,22 +424,7 @@ def dump
end

class Supervisor
# For ServerEngine's `reload_config`.
# This is called only at the initilization of the supervisor process,
# since Fluentd overwrites all related SIGNAL(HUP,USR1,USR2) and have own
# reloading feature.
def self.load_config(path, params = {})
pre_loadtime = 0
pre_loadtime = params['pre_loadtime'].to_i if params['pre_loadtime']
pre_config_mtime = nil
pre_config_mtime = params['pre_config_mtime'] if params['pre_config_mtime']
config_mtime = File.mtime(path)

# reuse previous config if last load time is within 5 seconds and mtime of the config file is not changed
if (Time.now - Time.at(pre_loadtime) < 5) && (config_mtime == pre_config_mtime)
return params['pre_conf']
end

def self.serverengine_config(params = {})
# ServerEngine's "daemonize" option is boolean, and path of pid file is brought by "pid_path"
pid_path = params['daemonize']
daemonize = !!params['daemonize']
Expand Down Expand Up @@ -470,28 +455,18 @@ def self.load_config(path, params = {})
File.join(File.dirname(__FILE__), 'daemon.rb'),
ServerModule.name,
WorkerModule.name,
path,
JSON.dump(params)],
command_sender: Fluent.windows? ? "pipe" : "signal",
config_path: params['fluentd_conf_path'],
fluentd_conf: params['fluentd_conf'],
conf_encoding: params['conf_encoding'],
inline_config: params['inline_config'],
config_path: path,
main_cmd: params['main_cmd'],
signame: params['signame'],
disable_shared_socket: params['disable_shared_socket'],
restart_worker_interval: params['restart_worker_interval'],
}
if daemonize
se_config[:pid_path] = pid_path
end
pre_params = params.dup
params['pre_loadtime'] = Time.now.to_i
params['pre_config_mtime'] = config_mtime
params['pre_conf'] = se_config
# prevent pre_conf from being too big by reloading many times.
pre_params['pre_conf'] = nil
params['pre_conf'][:windows_daemon_cmdline][5] = JSON.dump(pre_params)
se_config[:pid_path] = pid_path if daemonize

se_config
end
Expand Down Expand Up @@ -850,10 +825,11 @@ def supervise
'inline_config' => @inline_config,
'chuser' => @chuser,
'chgroup' => @chgroup,
'fluentd_conf_path' => @config_path,
'fluentd_conf' => @conf.to_s,
'use_v1_config' => @use_v1_config,
'conf_encoding' => @conf_encoding,
'signame' => @signame,
'fluentd_conf' => @conf.to_s,

'workers' => @system_config.workers,
'root_dir' => @system_config.root_dir,
Expand All @@ -866,8 +842,10 @@ def supervise
'restart_worker_interval' => @system_config.restart_worker_interval,
}

se = ServerEngine.create(ServerModule, WorkerModule){
Fluent::Supervisor.load_config(@config_path, params)
se = ServerEngine.create(ServerModule, WorkerModule) {
# Note: This is called only at the initialization of ServerEngine, since
# Fluentd overwrites all related SIGNAL(HUP,USR1,USR2) and have own reloading feature.
Fluent::Supervisor.serverengine_config(params)
}

se.run
Expand Down
112 changes: 10 additions & 102 deletions test/test_supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -517,31 +517,15 @@ def server.config
assert_equal('{"ok":true}', response)
end

def test_load_config
tmp_dir = "#{@tmp_dir}/dir/test_load_config.conf"
conf_info_str = %[
<system>
log_level info
</system>
]
conf_debug_str = %[
<system>
log_level debug
</system>
]
now = Time.now
Timecop.freeze(now)

write_config tmp_dir, conf_info_str

def test_serverengine_config
params = {}
params['workers'] = 1
params['fluentd_conf_path'] = "fluentd.conf"
params['use_v1_config'] = true
params['log_level'] = Fluent::Log::LEVEL_INFO
params['conf_encoding'] = 'utf-8'
load_config_proc = Proc.new { Fluent::Supervisor.load_config(tmp_dir, params) }
params['log_level'] = Fluent::Log::LEVEL_INFO
load_config_proc = Proc.new { Fluent::Supervisor.serverengine_config(params) }

# first call
se_config = load_config_proc.call
assert_equal Fluent::Log::LEVEL_INFO, se_config[:log_level]
assert_equal 'spawn', se_config[:worker_type]
Expand All @@ -551,68 +535,21 @@ def test_load_config
assert_equal false, se_config[:log_stderr]
assert_equal true, se_config[:enable_heartbeat]
assert_equal false, se_config[:auto_heartbeat]
assert_equal "fluentd.conf", se_config[:config_path]
assert_equal false, se_config[:daemonize]
assert_nil se_config[:pid_path]

# second call immediately(reuse config)
se_config = load_config_proc.call
pre_config_mtime = se_config[:windows_daemon_cmdline][5]['pre_config_mtime']
pre_loadtime = se_config[:windows_daemon_cmdline][5]['pre_loadtime']
assert_nil pre_config_mtime
assert_nil pre_loadtime

Timecop.freeze(now + 5)

# third call after 5 seconds(don't reuse config)
se_config = load_config_proc.call
pre_config_mtime = se_config[:windows_daemon_cmdline][5]['pre_config_mtime']
pre_loadtime = se_config[:windows_daemon_cmdline][5]['pre_loadtime']
assert_not_nil pre_config_mtime
assert_not_nil pre_loadtime

# forth call immediately(reuse config)
se_config = load_config_proc.call
# test that pre_config_mtime and pre_loadtime are not changed from previous one because reused pre_config
assert_equal pre_config_mtime, se_config[:windows_daemon_cmdline][5]['pre_config_mtime']
assert_equal pre_loadtime, se_config[:windows_daemon_cmdline][5]['pre_loadtime']

write_config tmp_dir, conf_debug_str

# fifth call after changed conf file(don't reuse config)
se_config = load_config_proc.call
assert_equal Fluent::Log::LEVEL_INFO, se_config[:log_level]
ensure
Timecop.return
end

def test_load_config_for_daemonize
tmp_dir = "#{@tmp_dir}/dir/test_load_config.conf"
conf_info_str = %[
<system>
log_level info
</system>
]
conf_debug_str = %[
<system>
log_level debug
</system>
]

now = Time.now
Timecop.freeze(now)

write_config tmp_dir, conf_info_str

def test_serverengine_config_for_daemonize
params = {}
params['workers'] = 1
params['fluentd_conf_path'] = "fluentd.conf"
params['use_v1_config'] = true
params['log_path'] = 'test/tmp/supervisor/log'
params['conf_encoding'] = 'utf-8'
params['log_level'] = Fluent::Log::LEVEL_INFO
params['daemonize'] = './fluentd.pid'
params['conf_encoding'] = 'utf-8'
load_config_proc = Proc.new { Fluent::Supervisor.load_config(tmp_dir, params) }
load_config_proc = Proc.new { Fluent::Supervisor.serverengine_config(params) }

# first call
se_config = load_config_proc.call
assert_equal Fluent::Log::LEVEL_INFO, se_config[:log_level]
assert_equal 'spawn', se_config[:worker_type]
Expand All @@ -622,38 +559,9 @@ def test_load_config_for_daemonize
assert_equal false, se_config[:log_stderr]
assert_equal true, se_config[:enable_heartbeat]
assert_equal false, se_config[:auto_heartbeat]
assert_equal "fluentd.conf", se_config[:config_path]
assert_equal true, se_config[:daemonize]
assert_equal './fluentd.pid', se_config[:pid_path]

# second call immediately(reuse config)
se_config = load_config_proc.call
pre_config_mtime = se_config[:windows_daemon_cmdline][5]['pre_config_mtime']
pre_loadtime = se_config[:windows_daemon_cmdline][5]['pre_loadtime']
assert_nil pre_config_mtime
assert_nil pre_loadtime

Timecop.freeze(now + 5)

# third call after 6 seconds(don't reuse config)
se_config = load_config_proc.call
pre_config_mtime = se_config[:windows_daemon_cmdline][5]['pre_config_mtime']
pre_loadtime = se_config[:windows_daemon_cmdline][5]['pre_loadtime']
assert_not_nil pre_config_mtime
assert_not_nil pre_loadtime

# forth call immediately(reuse config)
se_config = load_config_proc.call
# test that pre_config_mtime and pre_loadtime are not changed from previous one because reused pre_config
assert_equal pre_config_mtime, se_config[:windows_daemon_cmdline][5]['pre_config_mtime']
assert_equal pre_loadtime, se_config[:windows_daemon_cmdline][5]['pre_loadtime']

write_config tmp_dir, conf_debug_str

# fifth call after changed conf file(don't reuse config)
se_config = load_config_proc.call
assert_equal Fluent::Log::LEVEL_INFO, se_config[:log_level]
ensure
Timecop.return
end

data("supervisor", { supervise: true })
Expand Down