From cd6f617f2a6e111234698838c942f7ad10702c8e Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Thu, 4 Nov 2021 17:53:47 -0400 Subject: [PATCH 1/4] Take `&mut self` consistently In order to implement proper error handling, we are going to need to also access the singleton child process state. (In the future we should support concurrent blob fetches which would return the `std::fs::File` which can be accessed concurrently/separately from the proxy. But our users aren't doing concurrent fetches yet) --- src/imageproxy.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/imageproxy.rs b/src/imageproxy.rs index 07260a3..2affdae 100644 --- a/src/imageproxy.rs +++ b/src/imageproxy.rs @@ -217,7 +217,7 @@ impl ImageProxy { } async fn impl_request( - &self, + &mut self, method: &str, args: T, ) -> Result<(R, Option<(File, u32)>)> @@ -229,7 +229,7 @@ impl ImageProxy { Self::impl_request_raw(Arc::clone(&self.sockfd), req).await } - async fn finish_pipe(&self, pipeid: u32) -> Result<()> { + async fn finish_pipe(&mut self, pipeid: u32) -> Result<()> { let (r, fd) = self.impl_request("FinishPipe", [pipeid]).await?; if fd.is_some() { return Err(anyhow!("Unexpected fd in finish_pipe reply")); @@ -237,21 +237,21 @@ impl ImageProxy { Ok(r) } - pub async fn open_image(&self, imgref: &str) -> Result { + pub async fn open_image(&mut self, imgref: &str) -> Result { let (imgid, _) = self .impl_request::("OpenImage", [imgref]) .await?; Ok(OpenedImage(imgid)) } - pub async fn close_image(&self, img: &OpenedImage) -> Result<()> { + pub async fn close_image(&mut self, img: &OpenedImage) -> Result<()> { let (r, _) = self.impl_request("CloseImage", [img.0]).await?; Ok(r) } /// Fetch the manifest. /// https://github.com/opencontainers/image-spec/blob/main/manifest.md - pub async fn fetch_manifest(&self, img: &OpenedImage) -> Result<(String, Vec)> { + pub async fn fetch_manifest(&mut self, img: &OpenedImage) -> Result<(String, Vec)> { let (digest, fd) = self.impl_request("GetManifest", [img.0]).await?; let (fd, pipeid) = fd.ok_or_else(|| anyhow!("Missing fd from reply"))?; let mut fd = tokio::io::BufReader::new(tokio::fs::File::from_std(fd)); @@ -268,7 +268,7 @@ impl ImageProxy { /// Note that right now the proxy does verification of the digest: /// https://github.com/cgwalters/container-image-proxy/issues/1#issuecomment-926712009 pub async fn get_blob( - &self, + &mut self, img: &OpenedImage, digest: &str, size: u64, From 056ced4f1e33ed208f62938ab338a085c1921af2 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Thu, 4 Nov 2021 18:04:34 -0400 Subject: [PATCH 2/4] Check for child exit during all requests I am seeing the proxy seeming to exit during a request. We need to also monitor the child process while making a request. --- src/imageproxy.rs | 62 ++++++++++++++++++++--------------------------- 1 file changed, 26 insertions(+), 36 deletions(-) diff --git a/src/imageproxy.rs b/src/imageproxy.rs index 2affdae..f18519e 100644 --- a/src/imageproxy.rs +++ b/src/imageproxy.rs @@ -5,7 +5,7 @@ //! More information: use anyhow::{anyhow, Context, Result}; -use futures_util::{Future, FutureExt, TryFutureExt}; +use futures_util::Future; use nix::sys::socket::{self as nixsocket, ControlMessageOwned}; use nix::sys::uio::IoVec; use serde::{Deserialize, Serialize}; @@ -13,7 +13,7 @@ use std::fs::File; use std::os::unix::io::AsRawFd; use std::os::unix::prelude::{FromRawFd, RawFd}; use std::pin::Pin; -use std::process::{ExitStatus, Stdio}; +use std::process::Stdio; use std::sync::{Arc, Mutex}; use tokio::io::{AsyncBufRead, AsyncReadExt}; @@ -65,13 +65,12 @@ struct Reply { value: serde_json::Value, } -type JoinFuture = Pin>>>>; +type ChildFuture = Pin>>>; /// Manage a child process proxy to fetch container images. pub struct ImageProxy { sockfd: Arc>, - stderr: JoinFuture, - procwait: Pin>>>, + childwait: ChildFuture, } impl std::fmt::Debug for ImageProxy { @@ -117,20 +116,8 @@ impl ImageProxy { c.stdin(Stdio::from(theirsock)); let mut c = tokio::process::Command::from(c); c.kill_on_drop(true); - let mut proc = c.spawn().context("Failed to spawn skopeo")?; - - // Safety: We passed `Stdio::piped()` above - let mut child_stderr = proc.stderr.take().unwrap(); - - let stderr = tokio::spawn(async move { - let mut buf = String::new(); - child_stderr.read_to_string(&mut buf).await?; - Ok(buf) - }) - .map_err(anyhow::Error::msg) - .boxed(); - - let mut procwait = Box::pin(async move { proc.wait().map_err(anyhow::Error::msg).await }); + let child = c.spawn().context("Failed to spawn skopeo")?; + let mut childwait = Box::pin(child.wait_with_output()); let sockfd = Arc::new(Mutex::new(mysock)); @@ -141,9 +128,10 @@ impl ImageProxy { r = protoreq => { r?.0 } - r = &mut procwait => { - let errmsg = stderr.await??; - return Err(anyhow!("skopeo exited unexpectedly (no support for `experimental-image-proxy`?): {}\n{}", r?, errmsg)); + r = &mut childwait => { + let r = r?; + let stderr = String::from_utf8_lossy(&r.stderr); + return Err(anyhow!("skopeo exited unexpectedly (no support for `experimental-image-proxy`?): {}\n{}", r.status, stderr)); } }; let protover = semver::Version::parse(protover.as_str())?; @@ -156,11 +144,7 @@ impl ImageProxy { )); } - let r = Self { - stderr, - sockfd, - procwait, - }; + let r = Self { sockfd, childwait }; Ok(r) } @@ -225,8 +209,17 @@ impl ImageProxy { T: IntoIterator, I: Into, { - let req = Request::new(method, args); - Self::impl_request_raw(Arc::clone(&self.sockfd), req).await + let req = Self::impl_request_raw(Arc::clone(&self.sockfd), Request::new(method, args)); + tokio::select! { + r = req => { + Ok(r?) + } + r = &mut self.childwait => { + let r = r?; + let stderr = String::from_utf8_lossy(&r.stderr); + return Err(anyhow::anyhow!("proxy unexpectedly exited during request method {}: {}\n{}", method, r.status, stderr)) + } + } } async fn finish_pipe(&mut self, pipeid: u32) -> Result<()> { @@ -293,13 +286,10 @@ impl ImageProxy { let sockfd = Arc::try_unwrap(self.sockfd).unwrap().into_inner().unwrap(); nixsocket::send(sockfd.as_raw_fd(), &sendbuf, nixsocket::MsgFlags::empty())?; drop(sendbuf); - let status = self.procwait.await?; - if !status.success() { - if let Some(stderr) = self.stderr.await.map(|v| v.ok()).ok().flatten() { - anyhow::bail!("proxy failed: {}\n{}", status, stderr) - } else { - anyhow::bail!("proxy failed: {} (failed to fetch stderr)", status) - } + let output = self.childwait.await?; + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + anyhow::bail!("proxy failed: {}\n{}", output.status, stderr) } Ok(()) } From 94bd227ba765883fa145ae6e8aa1b59ea2347161 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Thu, 4 Nov 2021 18:48:12 -0400 Subject: [PATCH 3/4] Use main request method for semver checking Previously I refactored the code to have `impl_request_raw` to check the semver so we could do the child process checking at the same time. But now that that's a default part of the main request flow, clean things up here make the semver call not special. --- src/imageproxy.rs | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/src/imageproxy.rs b/src/imageproxy.rs index f18519e..1a99856 100644 --- a/src/imageproxy.rs +++ b/src/imageproxy.rs @@ -117,23 +117,14 @@ impl ImageProxy { let mut c = tokio::process::Command::from(c); c.kill_on_drop(true); let child = c.spawn().context("Failed to spawn skopeo")?; - let mut childwait = Box::pin(child.wait_with_output()); + let childwait = Box::pin(child.wait_with_output()); let sockfd = Arc::new(Mutex::new(mysock)); + let mut r = Self { sockfd, childwait }; + // Verify semantic version - let protoreq = - Self::impl_request_raw::(Arc::clone(&sockfd), Request::new_bare("Initialize")); - let protover = tokio::select! { - r = protoreq => { - r?.0 - } - r = &mut childwait => { - let r = r?; - let stderr = String::from_utf8_lossy(&r.stderr); - return Err(anyhow!("skopeo exited unexpectedly (no support for `experimental-image-proxy`?): {}\n{}", r.status, stderr)); - } - }; + let protover = r.impl_request::("Initialize", []).await?.0; let protover = semver::Version::parse(protover.as_str())?; let supported = &*SUPPORTED_PROTO_VERSION; if !supported.matches(&protover) { @@ -144,7 +135,6 @@ impl ImageProxy { )); } - let r = Self { sockfd, childwait }; Ok(r) } From 8593d1a391836faf33a4613df84811d9aa368842 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Fri, 5 Nov 2021 09:24:56 -0400 Subject: [PATCH 4/4] Bump to 0.2.0 for API changes --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 3b0f3c9..487e8d3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ license = "MIT OR Apache-2.0" name = "containers-image-proxy" readme = "README.md" repository = "https://github.com/cgwalters/containers-image-proxy" -version = "0.1.1" +version = "0.2.0" [dependencies] anyhow = "1.0"