-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Set timeout for helper subprocesses to enhance stability #11125
Changes from 5 commits
dbb517d
87e2b02
7d157e0
78c2014
81a3e19
b6aef5f
50df531
1fce03a
db94c06
2c4dc10
d5083a9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,202 @@ | ||
# typed: strict | ||
# frozen_string_literal: true | ||
|
||
require "open3" | ||
require "timeout" | ||
require "sorbet-runtime" | ||
require "shellwords" | ||
|
||
module Dependabot | ||
module CommandHelpers | ||
extend T::Sig | ||
|
||
class ProcessStatus | ||
extend T::Sig | ||
|
||
sig { params(process_status: Process::Status, custom_exitstatus: T.nilable(Integer)).void } | ||
def initialize(process_status, custom_exitstatus = nil) | ||
@process_status = process_status | ||
@custom_exitstatus = custom_exitstatus | ||
end | ||
|
||
# Return the exit status, either from the process status or the custom one | ||
sig { returns(Integer) } | ||
def exitstatus | ||
@custom_exitstatus || @process_status.exitstatus || 0 | ||
end | ||
|
||
# Determine if the process was successful | ||
sig { returns(T::Boolean) } | ||
def success? | ||
@custom_exitstatus.nil? ? @process_status.success? || false : @custom_exitstatus.zero? | ||
end | ||
|
||
# Return the PID of the process (if available) | ||
sig { returns(T.nilable(Integer)) } | ||
def pid | ||
@process_status.pid | ||
end | ||
|
||
sig { returns(T.nilable(Integer)) } | ||
def termsig | ||
@process_status.termsig | ||
end | ||
|
||
# String representation of the status | ||
sig { returns(String) } | ||
def to_s | ||
if @custom_exitstatus | ||
"pid #{pid || 'unknown'}: exit #{@custom_exitstatus} (custom status)" | ||
else | ||
@process_status.to_s | ||
end | ||
end | ||
end | ||
|
||
DEFAULT_TIMEOUTS = T.let({ | ||
no_time_out: -1, # No timeout | ||
local: 30, # Local commands | ||
network: 120, # Network-dependent commands | ||
long_running: 300 # Long-running tasks (e.g., builds) | ||
}.freeze, T::Hash[T.untyped, T.untyped]) | ||
|
||
# rubocop:disable Metrics/AbcSize | ||
# rubocop:disable Metrics/MethodLength | ||
# rubocop:disable Metrics/PerceivedComplexity | ||
# rubocop:disable Metrics/CyclomaticComplexity | ||
sig do | ||
params( | ||
env_cmd: T::Array[T.any(T::Hash[String, String], String)], | ||
stdin_data: T.nilable(String), | ||
stderr_to_stdout: T::Boolean, | ||
command_type: Symbol, | ||
timeout: Integer | ||
).returns([T.nilable(String), T.nilable(String), T.nilable(ProcessStatus), Float]) | ||
end | ||
def self.capture3_with_timeout( | ||
env_cmd, | ||
stdin_data: nil, | ||
stderr_to_stdout: false, | ||
command_type: :network, | ||
timeout: -1 | ||
) | ||
# Assign default timeout based on command type if timeout < 0 | ||
timeout = DEFAULT_TIMEOUTS[command_type] if timeout.negative? | ||
|
||
stdout = T.let("", String) | ||
stderr = T.let("", String) | ||
status = T.let(nil, T.nilable(ProcessStatus)) | ||
pid = T.let(nil, T.untyped) | ||
start_time = Time.now | ||
|
||
begin | ||
T.unsafe(Open3).popen3(*env_cmd) do |stdin, stdout_io, stderr_io, wait_thr| | ||
pid = wait_thr.pid | ||
stdin&.write(stdin_data) if stdin_data | ||
stdin&.close | ||
|
||
stdout_io.sync = true | ||
stderr_io.sync = true | ||
|
||
# Array to monitor both stdout and stderr | ||
ios = [stdout_io, stderr_io] | ||
|
||
last_output_time = Time.now # Track the last time output was received | ||
|
||
until ios.empty? | ||
# Calculate remaining timeout dynamically | ||
remaining_timeout = timeout - (Time.now - last_output_time) | ||
|
||
# Raise an error if timeout is exceeded | ||
if remaining_timeout <= 0 | ||
terminate_process(pid) | ||
status = ProcessStatus.new(wait_thr.value, 124) | ||
raise Timeout::Error, "Timed out due to inactivity after #{timeout} seconds" | ||
end | ||
|
||
# Use IO.select with a dynamically calculated short timeout | ||
ready_ios = IO.select(ios, nil, nil, [0.1, remaining_timeout].min) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've never worked with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are not monitoring for write readiness ( |
||
|
||
# Process ready IO streams | ||
ready_ios&.first&.each do |io| | ||
io.set_encoding("UTF-8", "UTF-8") | ||
|
||
data = io.read_nonblock(1024) | ||
|
||
last_output_time = Time.now | ||
if io == stdout_io | ||
stdout += data | ||
else | ||
stderr += data unless stderr_to_stdout | ||
stdout += data if stderr_to_stdout | ||
end | ||
rescue EOFError | ||
ios.delete(io) | ||
rescue IO::WaitReadable | ||
next | ||
end | ||
end | ||
|
||
status = ProcessStatus.new(wait_thr.value) | ||
end | ||
rescue Timeout::Error => e | ||
stderr += e.message unless stderr_to_stdout | ||
stdout += e.message if stderr_to_stdout | ||
rescue Errno::ENOENT => e | ||
stderr += e.message unless stderr_to_stdout | ||
stdout += e.message if stderr_to_stdout | ||
end | ||
|
||
elapsed_time = Time.now - start_time | ||
[stdout, stderr, status, elapsed_time] | ||
end | ||
# rubocop:enable Metrics/AbcSize | ||
# rubocop:enable Metrics/MethodLength | ||
# rubocop:enable Metrics/PerceivedComplexity | ||
# rubocop:enable Metrics/CyclomaticComplexity | ||
|
||
sig { params(pid: T.nilable(Integer)).void } | ||
def self.terminate_process(pid) | ||
return unless pid | ||
|
||
begin | ||
if process_alive?(pid) | ||
Process.kill("TERM", pid) # Attempt graceful termination | ||
sleep(0.5) # Allow process to terminate | ||
end | ||
if process_alive?(pid) | ||
Process.kill("KILL", pid) # Forcefully kill if still running | ||
end | ||
rescue Errno::EPERM | ||
Dependabot.logger.error("Insufficient permissions to terminate process: #{pid}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens in this case? Will the job still terminate? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In natural way if it terminates (old way) we don't do anything. If it doesn't then the operation is going to get canceled after hanging out as how it was before. But generally we should have permission to do it since we initiated the command in this thread. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would be surprised if this ever triggers TBH. This is only ever going to terminate a sub-process, and AFAIK the parent process should always have perms to terminate a child process. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I aggree. |
||
ensure | ||
begin | ||
Process.waitpid(pid) | ||
rescue Errno::ESRCH, Errno::ECHILD | ||
# Process has already exited | ||
end | ||
end | ||
end | ||
|
||
sig { params(pid: T.nilable(Integer)).returns(T::Boolean) } | ||
def self.process_alive?(pid) | ||
return false if pid.nil? # No PID, consider process not alive | ||
|
||
begin | ||
Process.kill(0, pid) # Check if the process exists | ||
true # Process is still alive | ||
rescue Errno::ESRCH | ||
false # Process does not exist (terminated successfully) | ||
rescue Errno::EPERM | ||
Dependabot.logger.error("Insufficient permissions to check process: #{pid}") | ||
false # Assume process not alive due to lack of permissions | ||
end | ||
end | ||
|
||
sig { params(command: String).returns(String) } | ||
def self.escape_command(command) | ||
command_parts = command.split.map(&:strip).reject(&:empty?) | ||
Shellwords.join(command_parts) | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,7 +7,6 @@ | |
require "fileutils" | ||
require "json" | ||
require "open3" | ||
require "shellwords" | ||
require "sorbet-runtime" | ||
require "tmpdir" | ||
|
||
|
@@ -17,9 +16,10 @@ | |
require "dependabot/errors" | ||
require "dependabot/workspace" | ||
require "dependabot" | ||
require "dependabot/command_helpers" | ||
|
||
module Dependabot | ||
module SharedHelpers | ||
module SharedHelpers # rubocop:disable Metrics/ModuleLength | ||
extend T::Sig | ||
|
||
GIT_CONFIG_GLOBAL_PATH = T.let(File.expand_path(".gitconfig", Utils::BUMP_TMP_DIR_PATH), String) | ||
|
@@ -121,8 +121,7 @@ def sentry_context | |
# Escapes all special characters, e.g. = & | <> | ||
sig { params(command: String).returns(String) } | ||
def self.escape_command(command) | ||
command_parts = command.split.map(&:strip).reject(&:empty?) | ||
Shellwords.join(command_parts) | ||
CommandHelpers.escape_command(command) | ||
end | ||
|
||
# rubocop:disable Metrics/MethodLength | ||
|
@@ -135,14 +134,18 @@ def self.escape_command(command) | |
env: T.nilable(T::Hash[String, String]), | ||
stderr_to_stdout: T::Boolean, | ||
allow_unsafe_shell_command: T::Boolean, | ||
error_class: T.class_of(HelperSubprocessFailed) | ||
error_class: T.class_of(HelperSubprocessFailed), | ||
command_type: Symbol, | ||
timeout: Integer | ||
) | ||
.returns(T.nilable(T.any(String, T::Hash[String, T.untyped], T::Array[T::Hash[String, T.untyped]]))) | ||
end | ||
def self.run_helper_subprocess(command:, function:, args:, env: nil, | ||
stderr_to_stdout: false, | ||
allow_unsafe_shell_command: false, | ||
error_class: HelperSubprocessFailed) | ||
error_class: HelperSubprocessFailed, | ||
command_type: :network, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Review Tip: We set the default timeout to 120 seconds (using Note: The timeout applies to the time between outputs to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TBH this feels more complex/confusing than it needs to be. This introduces magic symbols that callers must know to use, where those symbols are mapped to timeout values in the new command helpers class. There's also some complexity coming from the addition of both command type and timeout arguments. As a caller if I didn't know the internals of this function I would be confused that a command that was run with WDYT about creating constants for different common timeout values and having callers use those constants to set a timeout value when needed. This removes having two params, makes the timeout field explicit, and maintains named common timeout values. module CommandTimeouts
NETWORK = 120 # seconds
LONG_RUNNING = 300 # seconds
NONE = -1 # seconds
LOCAL_TASK = 30 # seconds
DEFAULT = NETWORK
end
def self.run_helper_subprocess(... timeout: CommandTimeouts::DEFAULT...) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is fixed. Please let me know if you see any issue now. |
||
timeout: -1) | ||
start = Time.now | ||
stdin_data = JSON.dump(function: function, args: args) | ||
cmd = allow_unsafe_shell_command ? command : escape_command(command) | ||
|
@@ -157,7 +160,16 @@ def self.run_helper_subprocess(command:, function:, args:, env: nil, | |
end | ||
|
||
env_cmd = [env, cmd].compact | ||
stdout, stderr, process = T.unsafe(Open3).capture3(*env_cmd, stdin_data: stdin_data) | ||
if Experiments.enabled?(:enable_shared_helpers_command_timeout) | ||
stdout, stderr, process = CommandHelpers.capture3_with_timeout( | ||
env_cmd, | ||
stdin_data: stdin_data, | ||
command_type: command_type, | ||
timeout: timeout | ||
) | ||
else | ||
stdout, stderr, process = T.unsafe(Open3).capture3(*env_cmd, stdin_data: stdin_data) | ||
end | ||
time_taken = Time.now - start | ||
|
||
if ENV["DEBUG_HELPERS"] == "true" | ||
|
@@ -177,16 +189,16 @@ def self.run_helper_subprocess(command:, function:, args:, env: nil, | |
function: function, | ||
args: args, | ||
time_taken: time_taken, | ||
stderr_output: stderr ? stderr[0..50_000] : "", # Truncate to ~100kb | ||
stderr_output: stderr[0..50_000], # Truncate to ~100kb | ||
process_exit_value: process.to_s, | ||
process_termsig: process.termsig | ||
process_termsig: process&.termsig | ||
} | ||
|
||
check_out_of_memory_error(stderr, error_context, error_class) | ||
|
||
begin | ||
response = JSON.parse(stdout) | ||
return response["result"] if process.success? | ||
return response["result"] if process&.success? | ||
|
||
raise error_class.new( | ||
message: response["error"], | ||
|
@@ -415,22 +427,27 @@ def self.find_safe_directories | |
safe_directories | ||
end | ||
|
||
# rubocop:disable Metrics/PerceivedComplexity | ||
sig do | ||
params( | ||
command: String, | ||
allow_unsafe_shell_command: T::Boolean, | ||
cwd: T.nilable(String), | ||
env: T.nilable(T::Hash[String, String]), | ||
fingerprint: T.nilable(String), | ||
stderr_to_stdout: T::Boolean | ||
stderr_to_stdout: T::Boolean, | ||
command_type: Symbol, | ||
timeout: Integer | ||
).returns(String) | ||
end | ||
def self.run_shell_command(command, | ||
allow_unsafe_shell_command: false, | ||
cwd: nil, | ||
env: {}, | ||
fingerprint: nil, | ||
stderr_to_stdout: true) | ||
stderr_to_stdout: true, | ||
command_type: :network, | ||
timeout: -1) | ||
start = Time.now | ||
cmd = allow_unsafe_shell_command ? command : escape_command(command) | ||
|
||
|
@@ -439,7 +456,15 @@ def self.run_shell_command(command, | |
opts = {} | ||
opts[:chdir] = cwd if cwd | ||
|
||
if stderr_to_stdout | ||
env_cmd = [env || {}, cmd, opts].compact | ||
if Experiments.enabled?(:enable_shared_helpers_command_timeout) | ||
stdout, stderr, process, _elapsed_time = CommandHelpers.capture3_with_timeout( | ||
env_cmd, | ||
stderr_to_stdout: stderr_to_stdout, | ||
command_type: command_type, | ||
timeout: timeout | ||
) | ||
elsif stderr_to_stdout | ||
stdout, process = Open3.capture2e(env || {}, cmd, opts) | ||
else | ||
stdout, stderr, process = Open3.capture3(env || {}, cmd, opts) | ||
|
@@ -449,7 +474,7 @@ def self.run_shell_command(command, | |
|
||
# Raise an error with the output from the shell session if the | ||
# command returns a non-zero status | ||
return stdout if process.success? | ||
return stdout || "" if process&.success? | ||
|
||
error_context = { | ||
command: cmd, | ||
|
@@ -461,10 +486,11 @@ def self.run_shell_command(command, | |
check_out_of_disk_memory_error(stderr, error_context) | ||
|
||
raise SharedHelpers::HelperSubprocessFailed.new( | ||
message: stderr_to_stdout ? stdout : "#{stderr}\n#{stdout}", | ||
message: stderr_to_stdout ? (stdout || "") : "#{stderr}\n#{stdout}", | ||
error_context: error_context | ||
) | ||
end | ||
# rubocop:enable Metrics/PerceivedComplexity | ||
|
||
sig { params(stderr: T.nilable(String), error_context: T::Hash[Symbol, String]).void } | ||
def self.check_out_of_disk_memory_error(stderr, error_context) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason to keep these as
T::Hash[T.untyped, T.untyped]
? We're controlling them and all the values suggest to me thatT::Hash[Symbol, Integer]
would be the signature.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Will fix it.