Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds workunit for interactive processes. (Cherry-pick of #17544) #17996

Merged
merged 1 commit into from
Jan 14, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 141 additions & 135 deletions src/rust/engine/src/intrinsics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ use rule_graph::DependencyKey;
use stdio::TryCloneAsFile;
use store::{SnapshotOps, SubsetParams};

use workunit_store::{in_workunit, Level};

type IntrinsicFn =
Box<dyn Fn(Context, Vec<Value>) -> BoxFuture<'static, NodeResult<Value>> + Send + Sync>;

Expand Down Expand Up @@ -512,147 +514,151 @@ fn interactive_process(
context: Context,
args: Vec<Value>,
) -> BoxFuture<'static, NodeResult<Value>> {
async move {
let types = &context.core.types;
let interactive_process_result = types.interactive_process_result;

let (py_interactive_process, py_process, process_config): (Value, Value, externs::process::PyProcessConfigFromEnvironment) = Python::with_gil(|py| {
let py_interactive_process = (*args[0]).as_ref(py);
let py_process: Value = externs::getattr(py_interactive_process, "process").unwrap();
let process_config = (*args[1])
.as_ref(py)
.extract()
.unwrap();
(py_interactive_process.extract().unwrap(), py_process, process_config)
});
match process_config.execution_strategy {
ProcessExecutionStrategy::Docker(_) | ProcessExecutionStrategy::RemoteExecution(_) => Err("InteractiveProcess should not set docker_image or remote_execution".to_owned()),
_ => Ok(())
}?;
let mut process = ExecuteProcess::lift(&context.core.store(), py_process, process_config).await?.process;
let (run_in_workspace, restartable, keep_sandboxes) = Python::with_gil(|py| {
let py_interactive_process_obj = py_interactive_process.to_object(py);
let py_interactive_process = py_interactive_process_obj.as_ref(py);
let run_in_workspace: bool = externs::getattr(py_interactive_process, "run_in_workspace").unwrap();
let restartable: bool = externs::getattr(py_interactive_process, "restartable").unwrap();
let keep_sandboxes_value: &PyAny = externs::getattr(py_interactive_process, "keep_sandboxes").unwrap();
let keep_sandboxes = KeepSandboxes::from_str(externs::getattr(keep_sandboxes_value, "value").unwrap()).unwrap();
(run_in_workspace, restartable, keep_sandboxes)
});

let session = context.session;

let mut tempdir = create_sandbox(
context.core.executor.clone(),
&context.core.local_execution_root_dir,
"interactive process",
keep_sandboxes,
)?;
prepare_workdir(
tempdir.path().to_owned(),
&process,
process.input_digests.input_files.clone(),
context.core.store(),
context.core.executor.clone(),
&context.core.named_caches,
&context.core.immutable_inputs,
None,
None,
)
.await?;
apply_chroot(tempdir.path().to_str().unwrap(), &mut process);

let p = Path::new(&process.argv[0]);
// TODO: Deprecate this program name calculation, and recommend `{chroot}` replacement in args
// instead.
let program_name = if !run_in_workspace && p.is_relative() {
let mut buf = PathBuf::new();
buf.push(tempdir.path());
buf.push(p);
buf
} else {
p.to_path_buf()
};

let mut command = process::Command::new(program_name);
if !run_in_workspace {
command.current_dir(tempdir.path());
}
for arg in process.argv[1..].iter() {
command.arg(arg);
}
in_workunit!(
"interactive_process",
Level::Debug,
|_workunit| async move {
let types = &context.core.types;
let interactive_process_result = types.interactive_process_result;

let (py_interactive_process, py_process, process_config): (Value, Value, externs::process::PyProcessConfigFromEnvironment) = Python::with_gil(|py| {
let py_interactive_process = (*args[0]).as_ref(py);
let py_process: Value = externs::getattr(py_interactive_process, "process").unwrap();
let process_config = (*args[1])
.as_ref(py)
.extract()
.unwrap();
(py_interactive_process.extract().unwrap(), py_process, process_config)
});
match process_config.execution_strategy {
ProcessExecutionStrategy::Docker(_) | ProcessExecutionStrategy::RemoteExecution(_) => Err("InteractiveProcess should not set docker_image or remote_execution".to_owned()),
_ => Ok(())
}?;
let mut process = ExecuteProcess::lift(&context.core.store(), py_process, process_config).await?.process;
let (run_in_workspace, restartable, keep_sandboxes) = Python::with_gil(|py| {
let py_interactive_process_obj = py_interactive_process.to_object(py);
let py_interactive_process = py_interactive_process_obj.as_ref(py);
let run_in_workspace: bool = externs::getattr(py_interactive_process, "run_in_workspace").unwrap();
let restartable: bool = externs::getattr(py_interactive_process, "restartable").unwrap();
let keep_sandboxes_value: &PyAny = externs::getattr(py_interactive_process, "keep_sandboxes").unwrap();
let keep_sandboxes = KeepSandboxes::from_str(externs::getattr(keep_sandboxes_value, "value").unwrap()).unwrap();
(run_in_workspace, restartable, keep_sandboxes)
});

let session = context.session;

let mut tempdir = create_sandbox(
context.core.executor.clone(),
&context.core.local_execution_root_dir,
"interactive process",
keep_sandboxes,
)?;
prepare_workdir(
tempdir.path().to_owned(),
&process,
process.input_digests.input_files.clone(),
context.core.store(),
context.core.executor.clone(),
&context.core.named_caches,
&context.core.immutable_inputs,
None,
None,
)
.await?;
apply_chroot(tempdir.path().to_str().unwrap(), &mut process);

let p = Path::new(&process.argv[0]);
// TODO: Deprecate this program name calculation, and recommend `{chroot}` replacement in args
// instead.
let program_name = if !run_in_workspace && p.is_relative() {
let mut buf = PathBuf::new();
buf.push(tempdir.path());
buf.push(p);
buf
} else {
p.to_path_buf()
};

let mut command = process::Command::new(program_name);
if !run_in_workspace {
command.current_dir(tempdir.path());
}
for arg in process.argv[1..].iter() {
command.arg(arg);
}

command.env_clear();
command.envs(process.env);
command.env_clear();
command.envs(process.env);

if !restartable {
task_side_effected()?;
}
if !restartable {
task_side_effected()?;
}

let exit_status = session.clone()
.with_console_ui_disabled(async move {
// Once any UI is torn down, grab exclusive access to the console.
let (term_stdin, term_stdout, term_stderr) =
stdio::get_destination().exclusive_start(Box::new(|_| {
// A stdio handler that will immediately trigger logging.
Err(())
}))?;
// NB: Command's stdio methods take ownership of a file-like to use, so we use
// `TryCloneAsFile` here to `dup` our thread-local stdio.
command
.stdin(Stdio::from(
term_stdin
.try_clone_as_file()
.map_err(|e| format!("Couldn't clone stdin: {}", e))?,
))
.stdout(Stdio::from(
term_stdout
.try_clone_as_file()
.map_err(|e| format!("Couldn't clone stdout: {}", e))?,
))
.stderr(Stdio::from(
term_stderr
.try_clone_as_file()
.map_err(|e| format!("Couldn't clone stderr: {}", e))?,
));
let mut subprocess = ManagedChild::spawn(command, context.core.graceful_shutdown_timeout)?;
tokio::select! {
_ = session.cancelled() => {
// The Session was cancelled: attempt to kill the process group / process, and
// then wait for it to exit (to avoid zombies).
if let Err(e) = subprocess.graceful_shutdown_sync() {
// Failed to kill the PGID: try the non-group form.
log::warn!("Failed to kill spawned process group ({}). Will try killing only the top process.\n\
This is unexpected: please file an issue about this problem at \
[https://github.com/pantsbuild/pants/issues/new]", e);
subprocess.kill().map_err(|e| format!("Failed to interrupt child process: {}", e)).await?;
};
subprocess.wait().await.map_err(|e| e.to_string())
}
exit_status = subprocess.wait() => {
// The process exited.
exit_status.map_err(|e| e.to_string())
let exit_status = session.clone()
.with_console_ui_disabled(async move {
// Once any UI is torn down, grab exclusive access to the console.
let (term_stdin, term_stdout, term_stderr) =
stdio::get_destination().exclusive_start(Box::new(|_| {
// A stdio handler that will immediately trigger logging.
Err(())
}))?;
// NB: Command's stdio methods take ownership of a file-like to use, so we use
// `TryCloneAsFile` here to `dup` our thread-local stdio.
command
.stdin(Stdio::from(
term_stdin
.try_clone_as_file()
.map_err(|e| format!("Couldn't clone stdin: {}", e))?,
))
.stdout(Stdio::from(
term_stdout
.try_clone_as_file()
.map_err(|e| format!("Couldn't clone stdout: {}", e))?,
))
.stderr(Stdio::from(
term_stderr
.try_clone_as_file()
.map_err(|e| format!("Couldn't clone stderr: {}", e))?,
));
let mut subprocess = ManagedChild::spawn(command, context.core.graceful_shutdown_timeout)?;
tokio::select! {
_ = session.cancelled() => {
// The Session was cancelled: attempt to kill the process group / process, and
// then wait for it to exit (to avoid zombies).
if let Err(e) = subprocess.graceful_shutdown_sync() {
// Failed to kill the PGID: try the non-group form.
log::warn!("Failed to kill spawned process group ({}). Will try killing only the top process.\n\
This is unexpected: please file an issue about this problem at \
[https://github.com/pantsbuild/pants/issues/new]", e);
subprocess.kill().map_err(|e| format!("Failed to interrupt child process: {}", e)).await?;
};
subprocess.wait().await.map_err(|e| e.to_string())
}
exit_status = subprocess.wait() => {
// The process exited.
exit_status.map_err(|e| e.to_string())
}
}
}
})
.await?;
})
.await?;

let code = exit_status.code().unwrap_or(-1);
if keep_sandboxes == KeepSandboxes::OnFailure && code != 0 {
tempdir.keep("interactive process");
}
let code = exit_status.code().unwrap_or(-1);
if keep_sandboxes == KeepSandboxes::OnFailure && code != 0 {
tempdir.keep("interactive process");
}

let result = {
let gil = Python::acquire_gil();
let py = gil.python();
externs::unsafe_call(
py,
interactive_process_result,
&[externs::store_i64(py, i64::from(code))],
)
};
Ok(result)
}.boxed()
let result = {
let gil = Python::acquire_gil();
let py = gil.python();
externs::unsafe_call(
py,
interactive_process_result,
&[externs::store_i64(py, i64::from(code))],
)
};
Ok(result)
}
).boxed()
}

fn docker_resolve_image(
Expand Down