Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for child process monitoring #3

Merged
merged 4 commits into from
Nov 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ license = "MIT OR Apache-2.0"
name = "containers-image-proxy"
readme = "README.md"
repository = "https://github.com/cgwalters/containers-image-proxy"
version = "0.1.1"
version = "0.2.0"

[dependencies]
anyhow = "1.0"
Expand Down
80 changes: 30 additions & 50 deletions src/imageproxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
//! More information: <https://github.com/containers/skopeo/pull/1476>

use anyhow::{anyhow, Context, Result};
use futures_util::{Future, FutureExt, TryFutureExt};
use futures_util::Future;
use nix::sys::socket::{self as nixsocket, ControlMessageOwned};
use nix::sys::uio::IoVec;
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::os::unix::io::AsRawFd;
use std::os::unix::prelude::{FromRawFd, RawFd};
use std::pin::Pin;
use std::process::{ExitStatus, Stdio};
use std::process::Stdio;
use std::sync::{Arc, Mutex};
use tokio::io::{AsyncBufRead, AsyncReadExt};

Expand Down Expand Up @@ -65,13 +65,12 @@ struct Reply {
value: serde_json::Value,
}

type JoinFuture<T> = Pin<Box<dyn Future<Output = Result<Result<T>>>>>;
type ChildFuture = Pin<Box<dyn Future<Output = std::io::Result<std::process::Output>>>>;

/// Manage a child process proxy to fetch container images.
pub struct ImageProxy {
sockfd: Arc<Mutex<File>>,
stderr: JoinFuture<String>,
procwait: Pin<Box<dyn Future<Output = Result<ExitStatus>>>>,
childwait: ChildFuture,
}

impl std::fmt::Debug for ImageProxy {
Expand Down Expand Up @@ -117,35 +116,15 @@ impl ImageProxy {
c.stdin(Stdio::from(theirsock));
let mut c = tokio::process::Command::from(c);
c.kill_on_drop(true);
let mut proc = c.spawn().context("Failed to spawn skopeo")?;

// Safety: We passed `Stdio::piped()` above
let mut child_stderr = proc.stderr.take().unwrap();

let stderr = tokio::spawn(async move {
let mut buf = String::new();
child_stderr.read_to_string(&mut buf).await?;
Ok(buf)
})
.map_err(anyhow::Error::msg)
.boxed();

let mut procwait = Box::pin(async move { proc.wait().map_err(anyhow::Error::msg).await });
let child = c.spawn().context("Failed to spawn skopeo")?;
let childwait = Box::pin(child.wait_with_output());

let sockfd = Arc::new(Mutex::new(mysock));

let mut r = Self { sockfd, childwait };

// Verify semantic version
let protoreq =
Self::impl_request_raw::<String>(Arc::clone(&sockfd), Request::new_bare("Initialize"));
let protover = tokio::select! {
r = protoreq => {
r?.0
}
r = &mut procwait => {
let errmsg = stderr.await??;
return Err(anyhow!("skopeo exited unexpectedly (no support for `experimental-image-proxy`?): {}\n{}", r?, errmsg));
}
};
let protover = r.impl_request::<String, _, ()>("Initialize", []).await?.0;
let protover = semver::Version::parse(protover.as_str())?;
let supported = &*SUPPORTED_PROTO_VERSION;
if !supported.matches(&protover) {
Expand All @@ -156,11 +135,6 @@ impl ImageProxy {
));
}

let r = Self {
stderr,
sockfd,
procwait,
};
Ok(r)
}

Expand Down Expand Up @@ -217,41 +191,50 @@ impl ImageProxy {
}

async fn impl_request<R: serde::de::DeserializeOwned + Send + 'static, T, I>(
&self,
&mut self,
method: &str,
args: T,
) -> Result<(R, Option<(File, u32)>)>
where
T: IntoIterator<Item = I>,
I: Into<serde_json::Value>,
{
let req = Request::new(method, args);
Self::impl_request_raw(Arc::clone(&self.sockfd), req).await
let req = Self::impl_request_raw(Arc::clone(&self.sockfd), Request::new(method, args));
tokio::select! {
r = req => {
Ok(r?)
}
r = &mut self.childwait => {
let r = r?;
let stderr = String::from_utf8_lossy(&r.stderr);
return Err(anyhow::anyhow!("proxy unexpectedly exited during request method {}: {}\n{}", method, r.status, stderr))
}
}
}

async fn finish_pipe(&self, pipeid: u32) -> Result<()> {
async fn finish_pipe(&mut self, pipeid: u32) -> Result<()> {
let (r, fd) = self.impl_request("FinishPipe", [pipeid]).await?;
if fd.is_some() {
return Err(anyhow!("Unexpected fd in finish_pipe reply"));
}
Ok(r)
}

pub async fn open_image(&self, imgref: &str) -> Result<OpenedImage> {
pub async fn open_image(&mut self, imgref: &str) -> Result<OpenedImage> {
let (imgid, _) = self
.impl_request::<u32, _, _>("OpenImage", [imgref])
.await?;
Ok(OpenedImage(imgid))
}

pub async fn close_image(&self, img: &OpenedImage) -> Result<()> {
pub async fn close_image(&mut self, img: &OpenedImage) -> Result<()> {
let (r, _) = self.impl_request("CloseImage", [img.0]).await?;
Ok(r)
}

/// Fetch the manifest.
/// https://github.com/opencontainers/image-spec/blob/main/manifest.md
pub async fn fetch_manifest(&self, img: &OpenedImage) -> Result<(String, Vec<u8>)> {
pub async fn fetch_manifest(&mut self, img: &OpenedImage) -> Result<(String, Vec<u8>)> {
let (digest, fd) = self.impl_request("GetManifest", [img.0]).await?;
let (fd, pipeid) = fd.ok_or_else(|| anyhow!("Missing fd from reply"))?;
let mut fd = tokio::io::BufReader::new(tokio::fs::File::from_std(fd));
Expand All @@ -268,7 +251,7 @@ impl ImageProxy {
/// Note that right now the proxy does verification of the digest:
/// https://github.com/cgwalters/container-image-proxy/issues/1#issuecomment-926712009
pub async fn get_blob(
&self,
&mut self,
img: &OpenedImage,
digest: &str,
size: u64,
Expand All @@ -293,13 +276,10 @@ impl ImageProxy {
let sockfd = Arc::try_unwrap(self.sockfd).unwrap().into_inner().unwrap();
nixsocket::send(sockfd.as_raw_fd(), &sendbuf, nixsocket::MsgFlags::empty())?;
drop(sendbuf);
let status = self.procwait.await?;
if !status.success() {
if let Some(stderr) = self.stderr.await.map(|v| v.ok()).ok().flatten() {
anyhow::bail!("proxy failed: {}\n{}", status, stderr)
} else {
anyhow::bail!("proxy failed: {} (failed to fetch stderr)", status)
}
let output = self.childwait.await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("proxy failed: {}\n{}", output.status, stderr)
}
Ok(())
}
Expand Down