diff --git a/lib/src/container/unencapsulate.rs b/lib/src/container/unencapsulate.rs index a5c68aae..f7da6082 100644 --- a/lib/src/container/unencapsulate.rs +++ b/lib/src/container/unencapsulate.rs @@ -192,41 +192,79 @@ pub async fn unencapsulate_from_manifest( return Err(anyhow!("containers-policy.json specifies a default of `insecureAcceptAnything`; refusing usage")); } let options = options.unwrap_or_default(); + let remote = match &imgref.sigverify { + SignatureSource::OstreeRemote(remote) => Some(remote.clone()), + SignatureSource::ContainerPolicy | SignatureSource::ContainerPolicyAllowInsecure => None, + }; let commit_layer = manifest .layers() .last() .ok_or_else(|| anyhow!("No layers found"))?; + let component_layers: Vec<_> = manifest + .layers() + .iter() + .filter(|&l| l != commit_layer) + .collect(); + let component_layer_size = component_layers.iter().fold(0, |acc, s| acc + s.size()); event!( Level::DEBUG, - "commit blob digest:{} size: {}", + "commit blob digest:{} size: {} components: {} size: {}", commit_layer.digest().as_str(), - commit_layer.size() + commit_layer.size(), + component_layers.len(), + component_layer_size, ); let mut proxy = ImageProxy::new().await?; let oi = proxy.open_image(&imgref.imgref.to_string()).await?; - let (blob, driver) = fetch_layer_decompress(&mut proxy, &oi, commit_layer).await?; - let blob = ProgressReader { - reader: blob, - progress: options.progress, - }; - let blob = tokio_util::io::SyncIoBridge::new(blob); - let remote = match &imgref.sigverify { - SignatureSource::OstreeRemote(remote) => Some(remote.clone()), - SignatureSource::ContainerPolicy | SignatureSource::ContainerPolicyAllowInsecure => None, - }; - let mut importer = crate::tar::Importer::new(&repo, remote); + let (tx, rx) = tokio::sync::mpsc::channel(1); + let repo = repo.clone(); let import = crate::tokio_util::spawn_blocking_cancellable(move |cancellable| { - let mut archive = tar::Archive::new(blob); + let mut rx = rx; + let mut importer = crate::tar::Importer::new(&repo, remote); let txn = repo.auto_transaction(Some(cancellable))?; + + // First, import the commit + let commit_blob = rx.blocking_recv().unwrap(); + let commit_blob = tokio_util::io::SyncIoBridge::new(commit_blob); + let mut archive = tar::Archive::new(commit_blob); importer.import_commit(&mut archive, Some(cancellable))?; + + // Then, all component/split blobs + while let Some(blob) = rx.blocking_recv() { + let blob = tokio_util::io::SyncIoBridge::new(blob); + let mut archive = tar::Archive::new(blob); + importer.import_objects(&mut archive, Some(cancellable))?; + } + let checksum: String = importer.finish_import(); txn.commit(Some(cancellable))?; repo.mark_commit_partial(&checksum, false)?; Ok::<_, anyhow::Error>(checksum) }); - let (import, driver) = tokio::join!(import, driver); - driver?; - let ostree_commit = import.with_context(|| format!("Parsing blob {}", layer.digest()))?; + for &layer in std::iter::once(&commit_layer).chain(component_layers.iter()) { + let (blob, driver) = fetch_layer_decompress(&mut proxy, &oi, commit_layer).await?; + let blob = ProgressReader { + reader: blob, + progress: options.progress, + }; + if tx.send(blob).await.is_err() { + drop(tx); + return match import.await? { + Ok(_) => { + return Err(anyhow::anyhow!( + "internal error: import worker thread did not set error" + )) + } + Err(e) => Err(e), + }; + } + driver.await?; + } + drop(tx); + + let import = import.await?; + let ostree_commit = + import.with_context(|| format!("Parsing blob {}", commit_layer.digest()))?; // FIXME write ostree commit after proxy finalization proxy.finalize().await?; event!(Level::DEBUG, "created commit {}", ostree_commit);