From 1b52de8ad8a564e359dc0d0714b2b8be4430e50f Mon Sep 17 00:00:00 2001
From: Daijiro Fukuda <fukuda@clear-code.com>
Date: Wed, 27 Nov 2024 11:50:54 +0900
Subject: [PATCH] SIGUSR2: Restart new process with zero downtime

This replaces the current `SIGUSR2` (#2716) with the new feature.
(Not supported on Windows).

* Restart the new process with zero downtime

The primary motivation is to enable the update of Fluentd
without data loss of plugins such as `in_udp`.

Specification:

* 2 ways to trigger this feature (non-Windows):
  * Signal: `SIGUSR2` to the supervisor.
    * Sending `SIGUSR2` to the workers triggers the traditional
      GracefulReload.
      * (Leave the traditional way, just in case)
  * RPC: `/api/processes.zeroDowntimeRestart`
    * Leave `/api/config.gracefulReload` for the traditional feature.
* This starts the new supervisor and workers with zero downtime
  for some plugins.
  * Input plugins with `zero_downtime_restart` supported work in
    parallel.
    * Supported input plugins:
      * `in_tcp`
      * `in_udp`
      * `in_syslog`
  * The old processes stop after 10s.
* The new supervisor works in `source-only` mode (#4661)
  until the old processes stop.
  * After the old processes stop, the data handled by the new
    processes are loaded and processed.
  * If need, you can configure `source_only_buffer` (see #4661).
* Windows: Not affected at all. Remains the traditional
  GracefulReload.

Mechanism:

1. The supervisor receives SIGUSR2.
2. Spawn a new supervisor.
3. Take over shared sockets.
4. Launch new workers, and stop old processes in parallel.
   * Launch new workers with source-only mode
     * Limit to zero_downtime_restart_ready? input plugin
   * Send SIGTERM to the old supervisor after 10s delay from 3.
5. The old supervisor stops and sends SIGWINCH to the new one.
6. The new workers run fully.

Note: need these feature

* https://github.com/fluent/fluentd/pull/4661
* https://github.com/treasure-data/serverengine/pull/146

Conditions under which `zero_downtime_restart_ready?` can be enabled:

* Must be able to work in parallel with another Fluentd instance.
* Notes:
  * The sockets provided by server helper are shared with the
    new Fluentd instance.
  * Input plugins managing a position such as `in_tail` should
    not enable its `zero_downtime_restart_ready?`.
    * Such input plugins do not cause data loss on restart, so
      there is no need to enable this in the first place.
  * `in_http` and `in_forward` could also be supported.
    Not supporting them this time is simply a matter of time to
    consider.

The appropriateness of replacing the traditional SIGUSR2:

* The traditional SIGUSR2 feature has some limitations and issues.
  * Limitations:
    1. A change to system_config is ignored because it needs to
       restart(kill/spawn) process.
    2. All plugins must not use class variable when restarting.
  * Issues:
    * #2259
    * #3469
    * #3549
* This new feature allows restarts without downtime and such
  limitations.
  * Although supported plugins are limited, that is not a
    problem for many plugins.
    (The problem is with server-based input plugins where the
    stop results in data loss).
* This new feature has a big advantage that it can also be used
  to update Fluentd.
  * In the future, fluent-package will use this feature to allow
    update with zero downtime by default.
* If needed, we can still use the traditional feature by RPC or
  directly sending `SIGUSR2` to the workers.

Co-authored-by: Shizuo Fujita <fujita@clear-code.com>
Signed-off-by: Daijiro Fukuda <fukuda@clear-code.com>
---
 lib/fluent/engine.rb           |   4 +-
 lib/fluent/plugin/in_syslog.rb |   4 +
 lib/fluent/plugin/in_tcp.rb    |   4 +
 lib/fluent/plugin/in_udp.rb    |   4 +
 lib/fluent/plugin/input.rb     |   4 +
 lib/fluent/root_agent.rb       |  50 +++++++--
 lib/fluent/supervisor.rb       | 196 ++++++++++++++++++++++++++++-----
 test/command/test_fluentd.rb   | 178 ++++++++++++++++++++++++++++++
 test/test_plugin_classes.rb    |   8 ++
 test/test_root_agent.rb        | 122 +++++++++++++++++++-
 test/test_supervisor.rb        | 123 ++++++++++++++++++++-
 11 files changed, 653 insertions(+), 44 deletions(-)

diff --git a/lib/fluent/engine.rb b/lib/fluent/engine.rb
index b6677c6443..263d1cfc5b 100644
--- a/lib/fluent/engine.rb
+++ b/lib/fluent/engine.rb
@@ -51,7 +51,7 @@ def initialize
 
     attr_reader :root_agent, :system_config, :supervisor_mode
 
-    def init(system_config, supervisor_mode: false)
+    def init(system_config, supervisor_mode: false, start_in_parallel: false)
       @system_config = system_config
       @supervisor_mode = supervisor_mode
 
@@ -60,7 +60,7 @@ def init(system_config, supervisor_mode: false)
 
       @log_event_verbose = system_config.log_event_verbose unless system_config.log_event_verbose.nil?
 
-      @root_agent = RootAgent.new(log: log, system_config: @system_config)
+      @root_agent = RootAgent.new(log: log, system_config: @system_config, start_in_parallel: start_in_parallel)
 
       self
     end
diff --git a/lib/fluent/plugin/in_syslog.rb b/lib/fluent/plugin/in_syslog.rb
index 28ad1c5dd9..3d4b205c98 100644
--- a/lib/fluent/plugin/in_syslog.rb
+++ b/lib/fluent/plugin/in_syslog.rb
@@ -156,6 +156,10 @@ def multi_workers_ready?
       true
     end
 
+    def zero_downtime_restart_ready?
+      true
+    end
+
     def start
       super
 
diff --git a/lib/fluent/plugin/in_tcp.rb b/lib/fluent/plugin/in_tcp.rb
index bd2ea83e5b..9ecb6b793d 100644
--- a/lib/fluent/plugin/in_tcp.rb
+++ b/lib/fluent/plugin/in_tcp.rb
@@ -101,6 +101,10 @@ def multi_workers_ready?
       true
     end
 
+    def zero_downtime_restart_ready?
+      true
+    end
+
     def start
       super
 
diff --git a/lib/fluent/plugin/in_udp.rb b/lib/fluent/plugin/in_udp.rb
index c2d436115f..645adf8f08 100644
--- a/lib/fluent/plugin/in_udp.rb
+++ b/lib/fluent/plugin/in_udp.rb
@@ -65,6 +65,10 @@ def multi_workers_ready?
       true
     end
 
+    def zero_downtime_restart_ready?
+      true
+    end
+
     def start
       super
 
diff --git a/lib/fluent/plugin/input.rb b/lib/fluent/plugin/input.rb
index 7a6909f7a9..d465ed3a22 100644
--- a/lib/fluent/plugin/input.rb
+++ b/lib/fluent/plugin/input.rb
@@ -70,6 +70,10 @@ def metric_callback(es)
       def multi_workers_ready?
         false
       end
+
+      def zero_downtime_restart_ready?
+        false
+      end
     end
   end
 end
diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb
index c66ccb489c..0a7a34b1cb 100644
--- a/lib/fluent/root_agent.rb
+++ b/lib/fluent/root_agent.rb
@@ -48,7 +48,35 @@ module Fluent
   class RootAgent < Agent
     ERROR_LABEL = "@ERROR".freeze # @ERROR is built-in error label
 
-    def initialize(log:, system_config: SystemConfig.new)
+    class SourceOnlyMode
+      DISABELD = 0
+      NORMAL = 1
+      ONLY_ZERO_DOWNTIME_RESTART_READY = 2
+
+      def initialize(with_source_only, start_in_parallel)
+        if start_in_parallel
+          @mode = ONLY_ZERO_DOWNTIME_RESTART_READY
+        elsif with_source_only
+          @mode = NORMAL
+        else
+          @mode = DISABELD
+        end
+      end
+
+      def enabled?
+        @mode != DISABELD
+      end
+
+      def only_zero_downtime_restart_ready?
+        @mode == ONLY_ZERO_DOWNTIME_RESTART_READY
+      end
+
+      def disable!
+        @mode = DISABELD
+      end
+    end
+
+    def initialize(log:, system_config: SystemConfig.new, start_in_parallel: false)
       super(log: log)
 
       @labels = {}
@@ -56,7 +84,7 @@ def initialize(log:, system_config: SystemConfig.new)
       @suppress_emit_error_log_interval = 0
       @next_emit_error_log_time = nil
       @without_source = system_config.without_source || false
-      @with_source_only = system_config.with_source_only || false
+      @source_only_mode = SourceOnlyMode.new(system_config.with_source_only, start_in_parallel)
       @source_only_buffer_agent = nil
       @enable_input_metrics = system_config.enable_input_metrics || false
 
@@ -67,7 +95,7 @@ def initialize(log:, system_config: SystemConfig.new)
     attr_reader :labels
 
     def source_only_router
-      raise "[BUG] 'RootAgent#source_only_router' should not be called when 'with_source_only' is false" unless @with_source_only
+      raise "[BUG] 'RootAgent#source_only_router' should not be called when 'with_source_only' is false" unless @source_only_mode.enabled?
       @source_only_buffer_agent.event_router
     end
 
@@ -154,7 +182,7 @@ def configure(conf)
 
       super
 
-      setup_source_only_buffer_agent if @with_source_only
+      setup_source_only_buffer_agent if @source_only_mode.enabled?
 
       # initialize <source> elements
       if @without_source
@@ -187,9 +215,12 @@ def cleanup_source_only_buffer_agent
     end
 
     def lifecycle(desc: false, kind_callback: nil, kind_or_agent_list: nil)
+      only_zero_downtime_restart_ready = false
+
       unless kind_or_agent_list
-        if @with_source_only
+        if @source_only_mode.enabled?
           kind_or_agent_list = [:input, @source_only_buffer_agent]
+          only_zero_downtime_restart_ready = @source_only_mode.only_zero_downtime_restart_ready?
         elsif @source_only_buffer_agent
           # source_only_buffer_agent can re-reroute events, so the priority is equal to output_with_router.
           kind_or_agent_list = [:input, :output_with_router, @source_only_buffer_agent, @labels.values, :filter, :output].flatten
@@ -214,6 +245,9 @@ def lifecycle(desc: false, kind_callback: nil, kind_or_agent_list: nil)
                  end
           display_kind = (kind == :output_with_router ? :output : kind)
           list.each do |instance|
+            if only_zero_downtime_restart_ready
+              next unless instance.respond_to?(:zero_downtime_restart_ready?) and instance.zero_downtime_restart_ready?
+            end
             yield instance, display_kind
           end
         end
@@ -257,7 +291,7 @@ def flush!
     end
 
     def cancel_source_only!
-      unless @with_source_only
+      unless @source_only_mode.enabled?
         log.info "do nothing for canceling with-source-only because the current mode is not with-source-only."
         return
       end
@@ -285,7 +319,7 @@ def cancel_source_only!
       setup_source_only_buffer_agent(flush: true)
       start(kind_or_agent_list: [@source_only_buffer_agent])
 
-      @with_source_only = false
+      @source_only_mode.disable!
     end
 
     def shutdown(kind_or_agent_list: nil)
@@ -378,7 +412,7 @@ def add_source(type, conf)
       # See also 'fluentd/plugin/input.rb'
       input.context_router = @event_router
       input.configure(conf)
-      input.event_emitter_apply_source_only if @with_source_only
+      input.event_emitter_apply_source_only if @source_only_mode.enabled?
       if @enable_input_metrics
         @event_router.add_metric_callbacks(input.plugin_id, Proc.new {|es| input.metric_callback(es) })
       end
diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb
index a76bd1d40a..1352112a0b 100644
--- a/lib/fluent/supervisor.rb
+++ b/lib/fluent/supervisor.rb
@@ -43,11 +43,16 @@ def before_run
       @rpc_endpoint = nil
       @rpc_server = nil
       @counter = nil
+      @socket_manager_server = nil
+      @starting_new_supervisor_with_zero_downtime = false
+      @new_supervisor_pid = nil
+      start_in_parallel = ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD")
+      @zero_downtime_restart_mutex = Mutex.new
 
       @fluentd_lock_dir = Dir.mktmpdir("fluentd-lock-")
       ENV['FLUENTD_LOCK_DIR'] = @fluentd_lock_dir
 
-      if config[:rpc_endpoint]
+      if config[:rpc_endpoint] and not start_in_parallel
         @rpc_endpoint = config[:rpc_endpoint]
         @enable_get_dump = config[:enable_get_dump]
         run_rpc_server
@@ -59,16 +64,27 @@ def before_run
         install_supervisor_signal_handlers
       end
 
-      if counter = config[:counter_server]
+      if counter = config[:counter_server] and not start_in_parallel
         run_counter_server(counter)
       end
 
       if config[:disable_shared_socket]
         $log.info "shared socket for multiple workers is disabled"
+      elsif start_in_parallel
+        begin
+          raise "[BUG] SERVERENGINE_SOCKETMANAGER_PATH env var must exist when starting in parallel" unless ENV.key?('SERVERENGINE_SOCKETMANAGER_PATH')
+          @socket_manager_server = ServerEngine::SocketManager::Server.share_sockets_with_another_server(ENV['SERVERENGINE_SOCKETMANAGER_PATH'])
+          $log.info "zero-downtime-restart: took over the shared sockets", path: ENV['SERVERENGINE_SOCKETMANAGER_PATH']
+        rescue => e
+          $log.error "zero-downtime-restart: cancel sequence because failed to take over the shared sockets", error: e
+          raise
+        end
       else
-        server = ServerEngine::SocketManager::Server.open
-        ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = server.path.to_s
+        @socket_manager_server = ServerEngine::SocketManager::Server.open
+        ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = @socket_manager_server.path.to_s
       end
+
+      stop_parallel_old_supervisor_after_delay if start_in_parallel
     end
 
     def after_run
@@ -76,7 +92,9 @@ def after_run
       stop_rpc_server if @rpc_endpoint
       stop_counter_server if @counter
       cleanup_lock_dir
-      Fluent::Supervisor.cleanup_resources
+      Fluent::Supervisor.cleanup_socketmanager_path unless @starting_new_supervisor_with_zero_downtime
+
+      notify_new_supervisor_that_old_one_has_stopped if @starting_new_supervisor_with_zero_downtime
     end
 
     def cleanup_lock_dir
@@ -109,6 +127,13 @@ def run_rpc_server
         end
         nil
       }
+      unless Fluent.windows?
+        @rpc_server.mount_proc('/api/processes.zeroDowntimeRestart') { |req, res|
+          $log.debug "fluentd RPC got /api/processes.zeroDowntimeRestart request"
+          Process.kill :USR2, Process.pid
+          nil
+        }
+      end
       @rpc_server.mount_proc('/api/plugins.flushBuffers') { |req, res|
         $log.debug "fluentd RPC got /api/plugins.flushBuffers request"
         if Fluent.windows?
@@ -137,27 +162,24 @@ def run_rpc_server
 
       @rpc_server.mount_proc('/api/config.gracefulReload') { |req, res|
         $log.debug "fluentd RPC got /api/config.gracefulReload request"
-        if Fluent.windows?
-          supervisor_sigusr2_handler
-        else
-          Process.kill :USR2, Process.pid
-        end
-
+        graceful_reload
         nil
       }
 
-      @rpc_server.mount_proc('/api/config.getDump') { |req, res|
-        $log.debug "fluentd RPC got /api/config.getDump request"
-        $log.info "get dump in-memory config via HTTP"
-        res.body = supervisor_get_dump_config_handler
-        [nil, nil, res]
-      } if @enable_get_dump
+      if @enable_get_dump
+        @rpc_server.mount_proc('/api/config.getDump') { |req, res|
+          $log.debug "fluentd RPC got /api/config.getDump request"
+          $log.info "get dump in-memory config via HTTP"
+          res.body = supervisor_get_dump_config_handler
+          [nil, nil, res]
+        }
+      end
 
       @rpc_server.start
     end
 
     def stop_rpc_server
-      @rpc_server.shutdown
+      @rpc_server&.shutdown
     end
 
     def run_counter_server(counter_conf)
@@ -172,6 +194,44 @@ def stop_counter_server
       @counter.stop
     end
 
+    def stop_parallel_old_supervisor_after_delay
+      Thread.new do
+        # Delay to wait the new workers to start up.
+        # Even if it takes a long time to start the new workers and stop the old Fluentd first,
+        # it is no problem because the socket buffer works, as long as the capacity is not exceeded.
+        sleep 10
+        old_pid = ENV["FLUENT_RUNNING_IN_PARALLEL_WITH_OLD"]&.to_i
+        if old_pid
+          $log.info "zero-downtime-restart: stop the old supervisor"
+          Process.kill :TERM, old_pid
+        end
+      rescue => e
+        $log.warn "zero-downtime-restart: failed to stop the old supervisor." +
+                  " If the old one does not exist, please send SIGWINCH to this new process to start to work fully." +
+                  " If it exists, something went wrong. Please kill the old one manually.",
+                  error: e
+      end
+    end
+
+    def notify_new_supervisor_that_old_one_has_stopped
+      if config[:pid_path]
+        new_pid = File.read(config[:pid_path]).to_i
+      else
+        raise "[BUG] new_supervisor_pid is not saved" unless @new_supervisor_pid
+        new_pid = @new_supervisor_pid
+      end
+
+      $log.info "zero-downtime-restart: notify the new supervisor (pid: #{new_pid}) that old one has stopped"
+      Process.kill :WINCH, new_pid
+    rescue => e
+      $log.error(
+        "zero-downtime-restart: failed to notify the new supervisor." +
+        " Please send SIGWINCH to the new supervisor process manually" +
+        " if it does not start to work fully.",
+        error: e
+      )
+    end
+
     def install_supervisor_signal_handlers
       return if Fluent.windows?
 
@@ -187,7 +247,11 @@ def install_supervisor_signal_handlers
 
       trap :USR2 do
         $log.debug 'fluentd supervisor process got SIGUSR2'
-        supervisor_sigusr2_handler
+        if Fluent.windows?
+          graceful_reload
+        else
+          zero_downtime_restart
+        end
       end
 
       trap :WINCH do
@@ -259,7 +323,7 @@ def install_windows_event_handler
             when :usr1
               supervisor_sigusr1_handler
             when :usr2
-              supervisor_sigusr2_handler
+              graceful_reload
             when :cont
               supervisor_dump_handler_for_windows
             when :stop_event_thread
@@ -289,7 +353,7 @@ def supervisor_sigusr1_handler
       send_signal_to_workers(:USR1)
     end
 
-    def supervisor_sigusr2_handler
+    def graceful_reload
       conf = nil
       t = Thread.new do
         $log.info 'Reloading new config'
@@ -317,7 +381,76 @@ def supervisor_sigusr2_handler
       $log.error "Failed to reload config file: #{e}"
     end
 
+    def zero_downtime_restart
+      Thread.new do
+        @zero_downtime_restart_mutex.synchronize do
+          $log.info "start zero-downtime-restart sequence"
+
+          if @starting_new_supervisor_with_zero_downtime
+            $log.warn "zero-downtime-restart: canceled because it is already starting"
+            Thread.exit
+          end
+          if ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD")
+            $log.warn "zero-downtime-restart: canceled because the previous sequence is still running"
+            Thread.exit
+          end
+
+          @starting_new_supervisor_with_zero_downtime = true
+          commands = [ServerEngine.ruby_bin_path, $0] + ARGV
+          env_to_add = {
+            "SERVERENGINE_SOCKETMANAGER_INTERNAL_TOKEN" => ServerEngine::SocketManager::INTERNAL_TOKEN,
+            "FLUENT_RUNNING_IN_PARALLEL_WITH_OLD" => "#{Process.pid}",
+          }
+          pid = Process.spawn(env_to_add, commands.join(" "))
+          @new_supervisor_pid = pid unless config[:daemonize]
+
+          if config[:daemonize]
+            Thread.new(pid) do |pid|
+              _, status = Process.wait2(pid)
+              # check if `ServerEngine::Daemon#daemonize_with_double_fork` succeeded or not
+              unless status.success?
+                @starting_new_supervisor_with_zero_downtime = false
+                $log.error "zero-downtime-restart: failed because new supervisor exits unexpectedly"
+              end
+            end
+          else
+            Thread.new(pid) do |pid|
+              _, status = Process.wait2(pid)
+              @starting_new_supervisor_with_zero_downtime = false
+              $log.error "zero-downtime-restart: failed because new supervisor exits unexpectedly", status: status
+            end
+          end
+        end
+      rescue => e
+        $log.error "zero-downtime-restart: failed", error: e
+        @starting_new_supervisor_with_zero_downtime = false
+      end
+    end
+
     def cancel_source_only
+      if ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD")
+        if config[:rpc_endpoint]
+          begin
+            @rpc_endpoint = config[:rpc_endpoint]
+            @enable_get_dump = config[:enable_get_dump]
+            run_rpc_server
+          rescue => e
+            $log.error "failed to start RPC server", error: e
+          end
+        end
+
+        if counter = config[:counter_server]
+          begin
+            run_counter_server(counter)
+          rescue => e
+            $log.error "failed to start counter server", error: e
+          end
+        end
+
+        $log.info "zero-downtime-restart: done all sequences, now new processes start to work fully"
+        ENV.delete("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD")
+      end
+
       send_signal_to_workers(:WINCH)
     end
 
@@ -510,12 +643,11 @@ def self.default_options
       }
     end
 
-    def self.cleanup_resources
-      unless Fluent.windows?
-        if ENV.has_key?('SERVERENGINE_SOCKETMANAGER_PATH')
-          FileUtils.rm_f(ENV['SERVERENGINE_SOCKETMANAGER_PATH'])
-        end
-      end
+    def self.cleanup_socketmanager_path
+      return if Fluent.windows?
+      return unless ENV.key?('SERVERENGINE_SOCKETMANAGER_PATH')
+
+      FileUtils.rm_f(ENV['SERVERENGINE_SOCKETMANAGER_PATH'])
     end
 
     def initialize(cl_opt)
@@ -583,7 +715,7 @@ def run_supervisor(dry_run: false)
       begin
         ServerEngine::Privilege.change(@chuser, @chgroup)
         MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support)
-        Fluent::Engine.init(@system_config, supervisor_mode: true)
+        Fluent::Engine.init(@system_config, supervisor_mode: true, start_in_parallel: ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD"))
         Fluent::Engine.run_configure(@conf, dry_run: dry_run)
       rescue Fluent::ConfigError => e
         $log.error 'config error', file: @config_path, error: e
@@ -632,10 +764,10 @@ def run_worker
           File.umask(@chumask.to_i(8))
         end
         MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support)
-        Fluent::Engine.init(@system_config)
+        Fluent::Engine.init(@system_config, start_in_parallel: ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD"))
         Fluent::Engine.run_configure(@conf)
         Fluent::Engine.run
-        self.class.cleanup_resources if @standalone_worker
+        self.class.cleanup_socketmanager_path if @standalone_worker
         exit 0
       end
     end
@@ -853,6 +985,10 @@ def install_main_process_signal_handlers
         end
 
         trap :USR2 do
+          # Leave the old GracefulReload feature, just in case.
+          # We can send SIGUSR2 to the worker process to use this old GracefulReload feature.
+          # (Note: Normally, we can send SIGUSR2 to the supervisor process to use
+          #  zero-downtime-restart feature as GracefulReload on non-Windows.)
           reload_config
         end
 
diff --git a/test/command/test_fluentd.rb b/test/command/test_fluentd.rb
index 0b40b93c2a..4a394c38fb 100644
--- a/test/command/test_fluentd.rb
+++ b/test/command/test_fluentd.rb
@@ -1350,4 +1350,182 @@ def multi_workers_ready?; true; end
                          patterns_not_match: ["[error]"])
     end
   end
+
+  sub_test_case "zero_downtime_restart" do
+    setup do
+      omit "Not supported on Windows" if Fluent.windows?
+    end
+
+    def conf(udp_port, tcp_port, syslog_port)
+      <<~CONF
+        <system>
+          rpc_endpoint localhost:24444
+        </system>
+        <source>
+          @type monitor_agent
+        </source>
+        <source>
+          @type udp
+          tag test.udp
+          port #{udp_port}
+          <parse>
+            @type none
+          </parse>
+        </source>
+        <source>
+          @type tcp
+          tag test.tcp
+          port #{tcp_port}
+          <parse>
+            @type none
+          </parse>
+        </source>
+        <source>
+          @type syslog
+          tag test.syslog
+          port #{syslog_port}
+        </source>
+        <filter test.**>
+          @type record_transformer
+          <record>
+            foo foo
+          </record>
+        </filter>
+        <match test.**>
+          @type stdout
+        </match>
+      CONF
+    end
+
+    def run_fluentd(config)
+      conf_path = create_conf_file("test.conf", config)
+      assert File.exist?(conf_path)
+      cmdline = create_cmdline(conf_path)
+
+      stdio_buf = ""
+      execute_command(cmdline) do |pid, stdout|
+        begin
+          waiting(60) do
+            while true
+              readables, _, _ = IO.select([stdout], nil, nil, 1)
+              next unless readables
+              break if readables.first.eof?
+
+              buf = eager_read(readables.first)
+              stdio_buf << buf
+              logs = buf.split("\n")
+
+              yield logs
+
+              break if buf.include? "finish test"
+            end
+          end
+        ensure
+          supervisor_pids = stdio_buf.scan(SUPERVISOR_PID_PATTERN)
+          @supervisor_pid = supervisor_pids.last.first.to_i if supervisor_pids.size >= 2
+          stdio_buf.scan(WORKER_PID_PATTERN) do |worker_pid|
+            @worker_pids << worker_pid.first.to_i
+          end
+        end
+      end
+    end
+
+    def send_udp(port, count:, interval_sec:)
+      count.times do |i|
+        s = UDPSocket.new
+        s.send("udp-#{i}", 0, "localhost", port)
+        s.close
+        sleep interval_sec
+      end
+    end
+
+    def send_tcp(port, count:, interval_sec:)
+      count.times do |i|
+        s = TCPSocket.new("localhost", port)
+        s.write("tcp-#{i}\n")
+        s.close
+        sleep interval_sec
+      end
+    end
+
+    def send_syslog(port, count:, interval_sec:)
+      count.times do |i|
+        s = UDPSocket.new
+        s.send("<6>Sep 10 00:00:00 localhost test: syslog-#{i}", 0, "localhost", port)
+        s.close
+        sleep interval_sec
+      end
+    end
+
+    def send_end(port)
+      s = TCPSocket.new("localhost", port)
+      s.write("finish test\n")
+      s.close
+    end
+
+    test "should restart with zero downtime (no data loss)" do
+      udp_port, syslog_port = unused_port(2, protocol: :udp)
+      tcp_port = unused_port(protocol: :tcp)
+
+      client_threads = []
+      end_thread = nil
+      records_by_type = {
+        "udp" => [],
+        "tcp" => [],
+        "syslog" => [],
+      }
+
+      phase = "startup"
+      run_fluentd(conf(udp_port, tcp_port, syslog_port)) do |logs|
+        logs.each do |log|
+          next unless /"message":"(udp|tcp|syslog)-(\d+)","foo":"foo"}/ =~ log
+          type = $1
+          num = $2.to_i
+          assert_true records_by_type.key?(type)
+          records_by_type[type].append(num)
+        end
+
+        if phase == "startup" and logs.any? { |log| log.include?("fluentd worker is now running worker") }
+          phase = "zero-downtime-restart"
+
+          client_threads << Thread.new do
+            send_udp(udp_port, count: 500, interval_sec: 0.01)
+          end
+          client_threads << Thread.new do
+            send_tcp(tcp_port, count: 500, interval_sec: 0.01)
+          end
+          client_threads << Thread.new do
+            send_syslog(syslog_port, count: 500, interval_sec: 0.01)
+          end
+
+          sleep 1
+          response = Net::HTTP.get(URI.parse("http://localhost:24444/api/processes.zeroDowntimeRestart"))
+          assert_equal '{"ok":true}', response
+        elsif phase == "zero-downtime-restart" and logs.any? { |log| log.include?("zero-downtime-restart: done all sequences") }
+          phase = "flush"
+          response = Net::HTTP.get(URI.parse("http://localhost:24444/api/plugins.flushBuffers"))
+          assert_equal '{"ok":true}', response
+        elsif phase == "flush"
+          phase = "done"
+          end_thread = Thread.new do
+            client_threads.each(&:join)
+            sleep 5 # make sure to flush each chunk (1s flush interval for 1chunk)
+            send_end(tcp_port)
+          end
+        end
+      end
+
+      assert_equal(
+        [(0..499).to_a, (0..499).to_a, (0..499).to_a],
+        [
+          records_by_type["udp"].sort,
+          records_by_type["tcp"].sort,
+          records_by_type["syslog"].sort,
+        ]
+      )
+    ensure
+      client_threads.each(&:kill)
+      end_thread&.kill
+    end
+  end
 end
diff --git a/test/test_plugin_classes.rb b/test/test_plugin_classes.rb
index ccd6a1469c..a973cfba73 100644
--- a/test/test_plugin_classes.rb
+++ b/test/test_plugin_classes.rb
@@ -106,6 +106,14 @@ def initialize
       @emit_size_metrics = FluentTest::FluentTestCounterMetrics.new
     end
 
+    def multi_workers_ready?
+      true
+    end
+
+    def zero_downtime_restart_ready?
+      true
+    end
+
     def start
       super
       @started = true
diff --git a/test/test_root_agent.rb b/test/test_root_agent.rb
index d10174d0f3..2d31ba886c 100644
--- a/test/test_root_agent.rb
+++ b/test/test_root_agent.rb
@@ -992,6 +992,10 @@ def setup
       @root_agent.configure(Config.parse(conf, "(test)", "(test_dir)"))
     end
 
+    def teardown
+      @root_agent.shutdown
+    end
+
     test 'only input plugins should start' do
       @root_agent.start
 
@@ -1008,9 +1012,9 @@ def setup
         }
       )
     ensure
-      @root_agent.shutdown
       # Buffer files remain because not cancelling source-only.
       # As a test, they should be clean-up-ed.
+      @root_agent.shutdown
       buf_dir = @root_agent.instance_variable_get(:@source_only_buffer_agent).instance_variable_get(:@base_buffer_dir)
       FileUtils.remove_dir(buf_dir)
     end
@@ -1031,8 +1035,6 @@ def setup
           "output started?" => @root_agent.outputs.map { |plugin| plugin.started? },
         }
       )
-    ensure
-      @root_agent.shutdown
     end
 
     test 'buffer should be loaded after #cancel_source_only!' do
@@ -1045,10 +1047,124 @@ def setup
         sleep 1 until @root_agent.outputs[0].events["test.event"].any? { |record| record["num"] == 0 }
       end
 
+      waiting(3) do
+        # Wait the last data output
+        sleep 1 until @root_agent.outputs[0].events["test.event"].any? { |record| record["num"] == 19 }
+      end
+
       # all data should be outputted
       assert { @root_agent.outputs[0].events["test.event"].size == 20 }
+    end
+  end
+
+  sub_test_case 'start_in_parallel' do
+    def conf
+      <<~EOC
+        <source>
+          @type test_in_gen
+          @id test_in_gen
+          num 20
+          interval_sec 0.1
+          async
+        </source>
+
+        <source>
+          @type test_in
+          @id test_in
+        </source>
+
+        <filter test.**>
+          @type record_transformer
+          @id record_transformer
+          <record>
+            foo foo
+          </record>
+        </filter>
+
+        <match test.**>
+          @type test_out
+          @id test_out
+        </match>
+      EOC
+    end
+
+    def setup
+      omit "Not supported on Windows" if Fluent.windows?
+      system_config = SystemConfig.new(
+        Config::Element.new('system', '', {}, [
+          Config::Element.new('source_only_buffer', '', {
+            'flush_interval' => 1,
+          }, []),
+        ])
+      )
+      @root_agent = RootAgent.new(log: $log, system_config: system_config, start_in_parallel: true)
+      stub(Engine).root_agent { @root_agent }
+      stub(Engine).system_config { system_config }
+      @root_agent.configure(Config.parse(conf, "(test)", "(test_dir)"))
+    end
+
+    def teardown
+      @root_agent.shutdown
+    end
+
+    test 'only input plugins should start' do
+      @root_agent.start
+
+      assert_equal(
+        {
+          "input started?" => [true, false],
+          "filter started?" => [false],
+          "output started?" => [false],
+        },
+        {
+          "input started?" => @root_agent.inputs.map { |plugin| plugin.started? },
+          "filter started?" => @root_agent.filters.map { |plugin| plugin.started? },
+          "output started?" => @root_agent.outputs.map { |plugin| plugin.started? },
+        }
+      )
     ensure
+      # Buffer files remain because not cancelling source-only.
+      # As a test, they should be clean-up-ed.
       @root_agent.shutdown
+      buf_dir = @root_agent.instance_variable_get(:@source_only_buffer_agent).instance_variable_get(:@base_buffer_dir)
+      FileUtils.remove_dir(buf_dir)
+    end
+
+    test '#cancel_source_only! should start all plugins' do
+      @root_agent.start
+      @root_agent.cancel_source_only!
+
+      assert_equal(
+        {
+          "input started?" => [true, true],
+          "filter started?" => [true],
+          "output started?" => [true],
+        },
+        {
+          "input started?" => @root_agent.inputs.map { |plugin| plugin.started? },
+          "filter started?" => @root_agent.filters.map { |plugin| plugin.started? },
+          "output started?" => @root_agent.outputs.map { |plugin| plugin.started? },
+        }
+      )
+    end
+
+    test 'buffer should be loaded after #cancel_source_only!' do
+      @root_agent.start
+      sleep 1
+      @root_agent.cancel_source_only!
+
+      waiting(3) do
+        # Wait buffer loaded after source-only cancelled
+        sleep 1 until @root_agent.outputs[0].events["test.event"].any? { |record| record["num"] == 0 }
+      end
+
+      waiting(3) do
+        # Wait the last data output
+        sleep 1 until @root_agent.outputs[0].events["test.event"].any? { |record| record["num"] == 19 }
+      end
+
+      # all data should be outputted
+      assert { @root_agent.outputs[0].events["test.event"].size == 20 }
     end
   end
 end
diff --git a/test/test_supervisor.rb b/test/test_supervisor.rb
index 6d3363630e..28c20b2e39 100644
--- a/test/test_supervisor.rb
+++ b/test/test_supervisor.rb
@@ -19,7 +19,7 @@
 class SupervisorTest < ::Test::Unit::TestCase
   class DummyServer
     include Fluent::ServerModule
-    attr_accessor :rpc_endpoint, :enable_get_dump
+    attr_accessor :rpc_endpoint, :enable_get_dump, :socket_manager_server
     def config
       {}
     end
@@ -891,6 +891,127 @@ def server.config
     end
   end
 
+  sub_test_case "zero_downtime_restart" do
+    setup do
+      omit "Not supported on Windows" if Fluent.windows?
+    end
+
+    data(
+      # When daemonize, exit-status is important. The new spawned process does double-fork and exits soon.
+      "daemonize and succeeded double-fork of new process" => [true, true, 0, false],
+      "daemonize and failed double-fork of new process" => [true, false, 0, true],
+      # When no daemon, whether the new spawned process is alive is important, not exit-status.
+      "no daemon and new process alive" => [false, false, 3, false],
+      "no daemon and new process dead" => [false, false, 0, true],
+    )
+    def test_zero_downtime_restart((daemonize, wait_success, wait_sleep, restart_canceled))
+      # == Arrange ==
+      env_spawn = {}
+      pid_wait = nil
+
+      server = DummyServer.new
+
+      stub(server).config do
+        {
+          daemonize: daemonize,
+          pid_path: "test-pid-file",
+        }
+      end
+      process_stub = stub(Process)
+      process_stub.spawn do |env, commands|
+        env_spawn = env
+        -1
+      end
+      process_stub.wait2 do |pid|
+        pid_wait = pid
+        sleep wait_sleep
+        if wait_success
+          status = Class.new{def success?; true; end}.new
+        else
+          status = Class.new{def success?; false; end}.new
+        end
+        [pid, status]
+      end
+      stub(File).read("test-pid-file") { -1 }
+
+      # mock to check notify_new_supervisor_that_old_one_has_stopped sends SIGWINCH
+      if restart_canceled
+        mock(Process).kill(:WINCH, -1).never
+      else
+        mock(Process).kill(:WINCH, -1)
+      end
+
+      # == Act and Assert ==
+      server.before_run
+      server.zero_downtime_restart.join
+      sleep 1 # To wait a sub thread for waitpid in zero_downtime_restart
+      server.after_run
+
+      assert_equal(
+        [
+          !restart_canceled,
+          true,
+          Process.pid,
+          -1,
+        ],
+        [
+          server.instance_variable_get(:@starting_new_supervisor_with_zero_downtime),
+          env_spawn.key?("SERVERENGINE_SOCKETMANAGER_INTERNAL_TOKEN"),
+          env_spawn["FLUENT_RUNNING_IN_PARALLEL_WITH_OLD"].to_i,
+          pid_wait,
+        ]
+      )
+    ensure
+      Fluent::Supervisor.cleanup_socketmanager_path
+      ENV.delete('SERVERENGINE_SOCKETMANAGER_PATH')
+    end
+
+    def test_share_sockets
+      server = DummyServer.new
+      server.before_run
+      path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
+
+      client = ServerEngine::SocketManager::Client.new(path)
+      udp_port = unused_port(protocol: :udp)
+      tcp_port = unused_port(protocol: :tcp)
+      client.listen_udp("localhost", udp_port)
+      client.listen_tcp("localhost", tcp_port)
+
+      ENV['FLUENT_RUNNING_IN_PARALLEL_WITH_OLD'] = ""
+      new_server = DummyServer.new
+      stub(new_server).stop_parallel_old_supervisor_after_delay
+      new_server.before_run
+
+      assert_equal(
+        [[udp_port], [tcp_port]],
+        [
+          new_server.socket_manager_server.udp_sockets.values.map { |v| v.addr[1] },
+          new_server.socket_manager_server.tcp_sockets.values.map { |v| v.addr[1] },
+        ]
+      )
+    ensure
+      server&.after_run
+      new_server&.after_run
+      ENV.delete('SERVERENGINE_SOCKETMANAGER_PATH')
+      ENV.delete("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD")
+    end
+
+    def test_stop_parallel_old_supervisor_after_delay
+      ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = ""
+      ENV['FLUENT_RUNNING_IN_PARALLEL_WITH_OLD'] = "-1"
+      stub(ServerEngine::SocketManager::Server).share_sockets_with_another_server
+      mock(Process).kill(:TERM, -1)
+
+      server = DummyServer.new
+      server.before_run
+      sleep 12 # Can't we skip the delay for this test?
+    ensure
+      server&.after_run
+      ENV.delete('SERVERENGINE_SOCKETMANAGER_PATH')
+      ENV.delete("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD")
+    end
+  end
+
   def create_debug_dummy_logger
     dl_opts = {}
     dl_opts[:log_level] = ServerEngine::DaemonLogger::DEBUG