diff --git a/ci/installdeps.sh b/ci/installdeps.sh index 606032ed..15e668df 100755 --- a/ci/installdeps.sh +++ b/ci/installdeps.sh @@ -1,10 +1,17 @@ #!/bin/bash set -xeuo pipefail -yum -y install skopeo -yum -y --enablerepo=updates-testing update ostree-devel +# Always pull ostree from updates-testing to avoid the bodhi wait +dnf -y --enablerepo=updates-testing update ostree-devel + +# Pull the code from https://github.com/containers/skopeo/pull/1476 +# if necessary. +if ! skopeo experimental-image-proxy --help &>/dev/null; then + dnf -y install dnf-utils + dnf builddep -y skopeo + git clone --depth=1 https://github.com/containers/skopeo + cd skopeo + make + install -m 0755 bin/skopeo /usr/bin/ +fi -git clone --depth=1 https://github.com/cgwalters/container-image-proxy -cd container-image-proxy -make -install -m 0755 bin/container-image-proxy /usr/bin/ diff --git a/lib/Cargo.toml b/lib/Cargo.toml index ba294fc3..27578184 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -10,6 +10,7 @@ version = "0.4.0-alpha.0" [dependencies] anyhow = "1.0" +containers-image-proxy = { version = "0.1", git = "https://github.com/cgwalters/containers-image-proxy-rs" } async-compression = { version = "0.3", features = ["gzip", "tokio"] } bytes = "1.0.1" bitflags = "1" @@ -20,7 +21,6 @@ fn-error-context = "0.2.0" futures-util = "0.3.13" gvariant = "0.4.0" hex = "0.4.3" -hyper = { version = "0.14", features = ["full"] } indicatif = "0.16.0" lazy_static = "1.4.0" libc = "0.2.92" diff --git a/lib/src/container/imageproxy.rs b/lib/src/container/imageproxy.rs deleted file mode 100644 index 005a43f6..00000000 --- a/lib/src/container/imageproxy.rs +++ /dev/null @@ -1,157 +0,0 @@ -//! Run container-image-proxy as a subprocess. -//! This allows fetching a container image manifest and layers in a streaming fashion. -//! More information: - -use super::{oci, ImageReference, Result}; -use crate::cmdext::CommandRedirectionExt; -use anyhow::Context; -use futures_util::{Future, FutureExt, TryFutureExt, TryStreamExt}; -use hyper::body::HttpBody; -use hyper::client::conn::{Builder, SendRequest}; -use hyper::{Body, Request, StatusCode}; -use std::os::unix::prelude::AsRawFd; -use std::pin::Pin; -use std::process::Stdio; -use tokio::io::{AsyncBufRead, AsyncReadExt}; - -// What we get from boxing a fallible tokio::spawn() closure. Note the nested Result. -type JoinFuture = Pin>>>>; - -/// Manage a child process proxy to fetch container images. -pub(crate) struct ImageProxy { - proc: tokio::process::Child, - request_sender: SendRequest, - stderr: JoinFuture, - driver: JoinFuture<()>, -} - -impl ImageProxy { - /// Create an image proxy that fetches the target image. - pub(crate) async fn new(imgref: &ImageReference) -> Result { - // Communicate over an anonymous socketpair(2) - let (mysock, childsock) = tokio::net::UnixStream::pair()?; - let childsock = childsock.into_std()?; - let mut c = std::process::Command::new("container-image-proxy"); - c.arg(&imgref.to_string()); - c.stdout(Stdio::null()).stderr(Stdio::piped()); - if let Some(port) = std::env::var_os("OSTREE_IMAGE_PROXY_PORT") { - c.arg("--port"); - c.arg(port); - } else { - // Pass one half of the pair as fd 3 to the child - let target_fd = 3; - c.arg("--sockfd"); - c.arg(&format!("{}", target_fd)); - c.take_fd_n(childsock.as_raw_fd(), target_fd); - } - let mut c = tokio::process::Command::from(c); - c.kill_on_drop(true); - let mut proc = c.spawn().context("Failed to spawn container-image-proxy")?; - // We've passed over the fd, close it. - drop(childsock); - - // Safety: We passed `Stdio::piped()` above - let mut child_stderr = proc.stderr.take().unwrap(); - - // Connect via HTTP to the child - let (request_sender, connection) = Builder::new().handshake::<_, Body>(mysock).await?; - // Background driver that manages things like timeouts. - let driver = tokio::spawn(connection.map_err(anyhow::Error::msg)) - .map_err(anyhow::Error::msg) - .boxed(); - let stderr = tokio::spawn(async move { - let mut buf = String::new(); - child_stderr.read_to_string(&mut buf).await?; - Ok(buf) - }) - .map_err(anyhow::Error::msg) - .boxed(); - Ok(Self { - proc, - stderr, - request_sender, - driver, - }) - } - - /// Fetch the manifest. - /// https://github.com/opencontainers/image-spec/blob/main/manifest.md - pub(crate) async fn fetch_manifest(&mut self) -> Result<(String, Vec)> { - let req = Request::builder() - .header("Host", "localhost") - .method("GET") - .uri("/manifest") - .body(Body::from(""))?; - let mut resp = self.request_sender.send_request(req).await?; - if resp.status() != StatusCode::OK { - return Err(anyhow::anyhow!("error from proxy: {}", resp.status())); - } - let hname = "Manifest-Digest"; - let digest = resp - .headers() - .get(hname) - .ok_or_else(|| anyhow::anyhow!("Missing {} header", hname))? - .to_str() - .with_context(|| format!("Invalid {} header", hname))? - .to_string(); - let mut ret = Vec::new(); - while let Some(chunk) = resp.body_mut().data().await { - let chunk = chunk?; - ret.extend_from_slice(&chunk); - } - Ok((digest, ret)) - } - - /// Fetch a blob identified by e.g. `sha256:`. - /// https://github.com/opencontainers/image-spec/blob/main/descriptor.md - /// Note that right now the proxy does verification of the digest: - /// https://github.com/cgwalters/container-image-proxy/issues/1#issuecomment-926712009 - pub(crate) async fn fetch_blob( - &mut self, - digest: &str, - ) -> Result { - let uri = format!("/blobs/{}", digest); - let req = Request::builder() - .header("Host", "localhost") - .method("GET") - .uri(&uri) - .body(Body::from(""))?; - let resp = self.request_sender.send_request(req).await?; - let status = resp.status(); - let body = TryStreamExt::map_err(resp.into_body(), |e| { - std::io::Error::new(std::io::ErrorKind::Other, e) - }); - let mut body = tokio_util::io::StreamReader::new(body); - if status != StatusCode::OK { - let mut s = String::new(); - let _: usize = body.read_to_string(&mut s).await?; - return Err(anyhow::anyhow!("error from proxy: {}: {}", status, s)); - } - Ok(body) - } - - /// A wrapper for [`fetch_blob`] which fetches a layer and decompresses it. - pub(crate) async fn fetch_layer_decompress( - &mut self, - layer: &oci::ManifestLayer, - ) -> Result> { - let blob = self.fetch_blob(layer.digest.as_str()).await?; - Ok(layer.new_async_decompressor(blob)?) - } - - /// Close the HTTP connection and wait for the child process to exit successfully. - pub(crate) async fn finalize(mut self) -> Result<()> { - // For now discard any errors from the connection - drop(self.request_sender); - let _r = self.driver.await??; - let status = self.proc.wait().await?; - if !status.success() { - if let Some(stderr) = self.stderr.await.map(|v| v.ok()).ok().flatten() { - anyhow::bail!("proxy failed: {}\n{}", status, stderr) - } else { - anyhow::bail!("proxy failed: {} (failed to fetch stderr)", status) - } - } - Ok(()) - } -} diff --git a/lib/src/container/import.rs b/lib/src/container/import.rs index 59211b26..2d16b6e1 100644 --- a/lib/src/container/import.rs +++ b/lib/src/container/import.rs @@ -30,8 +30,11 @@ use super::*; use anyhow::{anyhow, Context}; +use containers_image_proxy::{ImageProxy, OpenedImage}; +use containers_image_proxy::{OCI_TYPE_LAYER_GZIP, OCI_TYPE_LAYER_TAR}; use fn_error_context::context; -use tokio::io::AsyncRead; +use futures_util::Future; +use tokio::io::{AsyncBufRead, AsyncRead}; use tracing::{event, instrument, Level}; /// The result of an import operation @@ -84,8 +87,10 @@ impl AsyncRead for ProgressReader { /// Download the manifest for a target image and its sha256 digest. #[context("Fetching manifest")] pub async fn fetch_manifest(imgref: &OstreeImageReference) -> Result<(Vec, String)> { - let mut proxy = imageproxy::ImageProxy::new(&imgref.imgref).await?; - let (digest, raw_manifest) = proxy.fetch_manifest().await?; + let proxy = ImageProxy::new().await?; + let oi = &proxy.open_image(&imgref.imgref.to_string()).await?; + let (digest, raw_manifest) = proxy.fetch_manifest(oi).await?; + proxy.close_image(oi).await?; Ok((raw_manifest, digest)) } @@ -135,6 +140,36 @@ pub async fn import( }) } +/// Create a decompressor for this MIME type, given a stream of input. +fn new_async_decompressor<'a>( + media_type: &str, + src: impl AsyncBufRead + Send + Unpin + 'a, +) -> Result> { + match media_type { + OCI_TYPE_LAYER_GZIP => Ok(Box::new(tokio::io::BufReader::new( + async_compression::tokio::bufread::GzipDecoder::new(src), + ))), + OCI_TYPE_LAYER_TAR => Ok(Box::new(src)), + o => Err(anyhow::anyhow!("Unhandled layer type: {}", o)), + } +} + +/// A wrapper for [`get_blob`] which fetches a layer and decompresses it. +pub(crate) async fn fetch_layer_decompress<'a>( + proxy: &'a ImageProxy, + img: &OpenedImage, + layer: &oci::ManifestLayer, +) -> Result<( + Box, + impl Future> + 'a, +)> { + let (blob, driver) = proxy + .get_blob(img, layer.digest.as_str(), layer.size) + .await?; + let blob = new_async_decompressor(&layer.media_type, blob)?; + Ok((blob, driver)) +} + /// Fetch a container image using an in-memory manifest and import its embedded OSTree commit. #[context("Importing {}", imgref)] #[instrument(skip(repo, options, manifest_bytes))] @@ -152,9 +187,15 @@ pub async fn import_from_manifest( let options = options.unwrap_or_default(); let manifest: oci::Manifest = serde_json::from_slice(manifest_bytes)?; let layer = require_one_layer_blob(&manifest)?; - event!(Level::DEBUG, "target blob: {}", layer.digest.as_str()); - let mut proxy = imageproxy::ImageProxy::new(&imgref.imgref).await?; - let blob = proxy.fetch_layer_decompress(layer).await?; + event!( + Level::DEBUG, + "target blob digest:{} size: {}", + layer.digest.as_str(), + layer.size + ); + let proxy = ImageProxy::new().await?; + let oi = &proxy.open_image(&imgref.imgref.to_string()).await?; + let (blob, driver) = fetch_layer_decompress(&proxy, oi, layer).await?; let blob = ProgressReader { reader: blob, progress: options.progress, @@ -164,9 +205,10 @@ pub async fn import_from_manifest( SignatureSource::OstreeRemote(remote) => taropts.remote = Some(remote.clone()), SignatureSource::ContainerPolicy | SignatureSource::ContainerPolicyAllowInsecure => {} } - let ostree_commit = crate::tar::import_tar(repo, blob, Some(taropts)) - .await - .with_context(|| format!("Parsing blob {}", layer.digest))?; + let import = crate::tar::import_tar(repo, blob, Some(taropts)); + let (import, driver) = tokio::join!(import, driver); + driver?; + let ostree_commit = import.with_context(|| format!("Parsing blob {}", layer.digest))?; // FIXME write ostree commit after proxy finalization proxy.finalize().await?; event!(Level::DEBUG, "created commit {}", ostree_commit); diff --git a/lib/src/container/mod.rs b/lib/src/container/mod.rs index 2612a47f..7d0405ce 100644 --- a/lib/src/container/mod.rs +++ b/lib/src/container/mod.rs @@ -228,7 +228,6 @@ mod export; pub use export::*; mod import; pub use import::*; -mod imageproxy; mod oci; mod skopeo; pub mod store; diff --git a/lib/src/container/oci.rs b/lib/src/container/oci.rs index 4c9724c8..d66f2db4 100644 --- a/lib/src/container/oci.rs +++ b/lib/src/container/oci.rs @@ -2,6 +2,7 @@ //! oriented towards generating images. use anyhow::{anyhow, Result}; +use containers_image_proxy::OCI_TYPE_LAYER_GZIP; use flate2::write::GzEncoder; use fn_error_context::context; use openat_ext::*; @@ -10,7 +11,6 @@ use phf::phf_map; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::io::prelude::*; -use tokio::io::AsyncBufRead; /// Map the value from `uname -m` to the Go architecture. /// TODO find a more canonical home for this. @@ -22,10 +22,6 @@ static MACHINE_TO_OCI: phf::Map<&str, &str> = phf_map! { // OCI types, see https://github.com/opencontainers/image-spec/blob/master/media-types.md pub(crate) const OCI_TYPE_CONFIG_JSON: &str = "application/vnd.oci.image.config.v1+json"; pub(crate) const OCI_TYPE_MANIFEST_JSON: &str = "application/vnd.oci.image.manifest.v1+json"; -pub(crate) const OCI_TYPE_LAYER_GZIP: &str = "application/vnd.oci.image.layer.v1.tar+gzip"; -pub(crate) const OCI_TYPE_LAYER_TAR: &str = "application/vnd.oci.image.layer.v1.tar"; -// FIXME - use containers/image to fully convert the manifest to OCI -const DOCKER_TYPE_LAYER_TARGZ: &str = "application/vnd.docker.image.rootfs.diff.tar.gzip"; /// Path inside an OCI directory to the blobs const BLOBDIR: &str = "blobs/sha256"; @@ -68,22 +64,6 @@ pub(crate) struct ManifestLayer { pub size: u64, } -impl ManifestLayer { - /// Create a decompressor for this layer, given a stream of input. - pub fn new_async_decompressor( - &self, - src: impl AsyncBufRead + Send + Unpin + 'static, - ) -> Result> { - match self.media_type.as_str() { - OCI_TYPE_LAYER_GZIP | DOCKER_TYPE_LAYER_TARGZ => Ok(Box::new( - tokio::io::BufReader::new(async_compression::tokio::bufread::GzipDecoder::new(src)), - )), - OCI_TYPE_LAYER_TAR => Ok(Box::new(src)), - o => Err(anyhow::anyhow!("Unhandled layer type: {}", o)), - } - } -} - #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub(crate) struct Manifest { diff --git a/lib/src/container/store.rs b/lib/src/container/store.rs index 1c9b282f..b93e0828 100644 --- a/lib/src/container/store.rs +++ b/lib/src/container/store.rs @@ -5,11 +5,11 @@ //! This code supports ingesting arbitrary layered container images from an ostree-exported //! base. See [`super::import`] for more information on encaspulation of images. -use super::imageproxy::ImageProxy; use super::oci::ManifestLayer; use super::*; use crate::refescape; use anyhow::{anyhow, Context}; +use containers_image_proxy::{ImageProxy, OpenedImage}; use fn_error_context::context; use ostree::prelude::{Cast, ToVariant}; use ostree::{gio, glib}; @@ -45,6 +45,7 @@ pub struct LayeredImageImporter { repo: ostree::Repo, proxy: ImageProxy, imgref: OstreeImageReference, + proxy_img: OpenedImage, ostree_ref: String, } @@ -135,12 +136,14 @@ fn manifest_from_commit(commit: &glib::Variant) -> Result { impl LayeredImageImporter { /// Create a new importer. pub async fn new(repo: &ostree::Repo, imgref: &OstreeImageReference) -> Result { - let proxy = ImageProxy::new(&imgref.imgref).await?; + let proxy = ImageProxy::new().await?; + let proxy_img = proxy.open_image(&imgref.imgref.to_string()).await?; let repo = repo.clone(); let ostree_ref = ref_for_image(&imgref.imgref)?; Ok(LayeredImageImporter { repo, proxy, + proxy_img, ostree_ref, imgref: imgref.clone(), }) @@ -161,7 +164,7 @@ impl LayeredImageImporter { _ => {} } - let (manifest_digest, manifest_bytes) = self.proxy.fetch_manifest().await?; + let (manifest_digest, manifest_bytes) = self.proxy.fetch_manifest(&self.proxy_img).await?; let manifest: oci::Manifest = serde_json::from_slice(&manifest_bytes)?; let new_imageid = manifest.imageid(); @@ -214,17 +217,23 @@ impl LayeredImageImporter { } /// Import a layered container image - pub async fn import(mut self, import: PreparedImport) -> Result { + pub async fn import(self, import: PreparedImport) -> Result { + let proxy = self.proxy; // First download the base image (if necessary) - we need the SELinux policy // there to label all following layers. let base_layer = import.base_layer; let base_commit = if let Some(c) = base_layer.commit { c } else { - let blob = self.proxy.fetch_layer_decompress(&base_layer.layer).await?; - let commit = crate::tar::import_tar(&self.repo, blob, None) - .await - .with_context(|| format!("Parsing blob {}", &base_layer.digest()))?; + let base_layer_ref = &base_layer.layer; + let (blob, driver) = + super::import::fetch_layer_decompress(&proxy, &self.proxy_img, &base_layer.layer) + .await?; + let importer = crate::tar::import_tar(&self.repo, blob, None); + let (commit, driver) = tokio::join!(importer, driver); + driver?; + let commit = + commit.with_context(|| format!("Parsing blob {}", &base_layer_ref.digest))?; // TODO support ref writing in tar import self.repo.set_ref_immediate( None, @@ -241,17 +250,20 @@ impl LayeredImageImporter { if let Some(c) = layer.commit { layer_commits.push(c.to_string()); } else { - let blob = self.proxy.fetch_layer_decompress(&layer.layer).await?; + let (blob, driver) = + super::import::fetch_layer_decompress(&proxy, &self.proxy_img, &layer.layer) + .await?; // An important aspect of this is that we SELinux label the derived layers using // the base policy. let opts = crate::tar::WriteTarOptions { base: Some(base_commit.clone()), selinux: true, }; - let r = - crate::tar::write_tar(&self.repo, blob, layer.ostree_ref.as_str(), Some(opts)) - .await - .with_context(|| format!("Parsing layer blob {}", layer.digest()))?; + let w = + crate::tar::write_tar(&self.repo, blob, layer.ostree_ref.as_str(), Some(opts)); + let (r, driver) = tokio::join!(w, driver); + let r = r.with_context(|| format!("Parsing layer blob {}", layer.digest()))?; + driver?; layer_commits.push(r.commit); if !r.filtered.is_empty() { layer_filtered_content.insert(layer.digest().to_string(), r.filtered); @@ -260,7 +272,7 @@ impl LayeredImageImporter { } // We're done with the proxy, make sure it didn't have any errors. - self.proxy.finalize().await?; + proxy.finalize().await?; let serialized_manifest = serde_json::to_string(&import.manifest)?; let mut metadata = HashMap::new();