From dca2c991db290b18de88a142ed9eb42225f70537 Mon Sep 17 00:00:00 2001 From: Abel Feng Date: Fri, 9 Aug 2024 14:17:16 +0800 Subject: [PATCH] vmm: support new sandbox controller api and streaming io Signed-off-by: Abel Feng --- vmm/common/Cargo.toml | 4 +- vmm/common/build.rs | 2 + vmm/common/src/api/mod.rs | 3 + .../containerd/types/transfer/data.proto | 29 ++ .../src/protos/google/protobuf/any.proto | 9 +- vmm/common/src/protos/streaming.proto | 31 ++ vmm/sandbox/Cargo.lock | 80 ++- vmm/sandbox/Cargo.toml | 19 +- vmm/sandbox/src/cloud_hypervisor/hooks.rs | 2 +- vmm/sandbox/src/container/handler/io.rs | 2 +- vmm/sandbox/src/qemu/hooks.rs | 2 +- vmm/sandbox/src/sandbox.rs | 8 + vmm/sandbox/src/stratovirt/hooks.rs | 2 +- vmm/task/Cargo.lock | 65 ++- vmm/task/Cargo.toml | 4 + vmm/task/src/container.rs | 3 +- vmm/task/src/io.rs | 35 +- vmm/task/src/main.rs | 14 +- vmm/task/src/streaming.rs | 482 ++++++++++++++++++ 19 files changed, 733 insertions(+), 63 deletions(-) create mode 100644 vmm/common/src/protos/containerd/types/transfer/data.proto create mode 100644 vmm/common/src/protos/streaming.proto create mode 100644 vmm/task/src/streaming.rs diff --git a/vmm/common/Cargo.toml b/vmm/common/Cargo.toml index 468df0d5..1e84f42e 100644 --- a/vmm/common/Cargo.toml +++ b/vmm/common/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" panic = 'abort' [dependencies] -containerd-sandbox = {git="https://github.com/kuasar-io/rust-extensions.git"} +containerd-sandbox = { git = "https://github.com/kuasar-io/rust-extensions.git" } serde = "1.0.139" lazy_static = "1.4.0" nix = "0.24.1" @@ -20,4 +20,4 @@ async-trait = "0.1" regex = "1.5.6" [build-dependencies] -ttrpc-codegen = "0.4" +ttrpc-codegen = { git = "https://github.com/kuasar-io/ttrpc-rust.git", branch = "v0.7.1-kuasar" } diff --git a/vmm/common/build.rs b/vmm/common/build.rs index 91c07a46..3e108bed 100644 --- a/vmm/common/build.rs +++ b/vmm/common/build.rs @@ -25,6 +25,8 @@ fn main() { "src/protos/google/protobuf/descriptor.proto", "src/protos/google/protobuf/empty.proto", "src/protos/google/protobuf/timestamp.proto", + "src/protos/streaming.proto", + "src/protos/containerd/types/transfer/data.proto", ]; Codegen::new() diff --git a/vmm/common/src/api/mod.rs b/vmm/common/src/api/mod.rs index 9b760a21..1ef8dcd7 100644 --- a/vmm/common/src/api/mod.rs +++ b/vmm/common/src/api/mod.rs @@ -15,10 +15,13 @@ limitations under the License. */ pub mod any; +pub mod data; pub mod descriptor; pub mod empty; pub mod events; pub mod fieldpath; pub mod sandbox; pub mod sandbox_ttrpc; +pub mod streaming; +pub mod streaming_ttrpc; pub mod timestamp; diff --git a/vmm/common/src/protos/containerd/types/transfer/data.proto b/vmm/common/src/protos/containerd/types/transfer/data.proto new file mode 100644 index 00000000..0d7e223a --- /dev/null +++ b/vmm/common/src/protos/containerd/types/transfer/data.proto @@ -0,0 +1,29 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +syntax = "proto3"; + +package containerd.types.transfer; + +option go_package = "github.com/containerd/containerd/v2/api/types/transfer"; + +message Data { + bytes data = 1; +} + +message WindowUpdate { + int32 update = 1; +} diff --git a/vmm/common/src/protos/google/protobuf/any.proto b/vmm/common/src/protos/google/protobuf/any.proto index 6ed8a23c..c9be8541 100644 --- a/vmm/common/src/protos/google/protobuf/any.proto +++ b/vmm/common/src/protos/google/protobuf/any.proto @@ -33,7 +33,7 @@ syntax = "proto3"; package google.protobuf; option csharp_namespace = "Google.Protobuf.WellKnownTypes"; -option go_package = "google.golang.org/protobuf/types/known/anypb"; +option go_package = "github.com/golang/protobuf/ptypes/any"; option java_package = "com.google.protobuf"; option java_outer_classname = "AnyProto"; option java_multiple_files = true; @@ -77,13 +77,10 @@ option objc_class_prefix = "GPB"; // Example 4: Pack and unpack a message in Go // // foo := &pb.Foo{...} -// any, err := anypb.New(foo) -// if err != nil { -// ... -// } +// any, err := ptypes.MarshalAny(foo) // ... // foo := &pb.Foo{} -// if err := any.UnmarshalTo(foo); err != nil { +// if err := ptypes.UnmarshalAny(any, foo); err != nil { // ... // } // diff --git a/vmm/common/src/protos/streaming.proto b/vmm/common/src/protos/streaming.proto new file mode 100644 index 00000000..f9a73b01 --- /dev/null +++ b/vmm/common/src/protos/streaming.proto @@ -0,0 +1,31 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +syntax = "proto3"; + +package containerd.services.streaming.v1; + +import "google/protobuf/any.proto"; + +option go_package = "github.com/containerd/containerd/v2/api/services/streaming/v1;streaming"; + +service Streaming { + rpc Stream(stream google.protobuf.Any) returns (stream google.protobuf.Any); +} + +message StreamInit { + string id = 1; +} diff --git a/vmm/sandbox/Cargo.lock b/vmm/sandbox/Cargo.lock index 4d2d79fd..d18e3a45 100644 --- a/vmm/sandbox/Cargo.lock +++ b/vmm/sandbox/Cargo.lock @@ -222,6 +222,12 @@ version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "bitflags" version = "1.3.2" @@ -318,6 +324,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cfg_aliases" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" + [[package]] name = "cgroups-rs" version = "0.2.11" @@ -426,11 +438,12 @@ dependencies = [ [[package]] name = "containerd-sandbox" version = "0.1.0" -source = "git+https://github.com/kuasar-io/rust-extensions.git#a0fac88f1594ae9694d4cec65b21c57108915c70" +source = "git+https://github.com/kuasar-io/rust-extensions.git#53b4ca86b3461efb22b881b891e41fb45e264c7f" dependencies = [ "anyhow", "async-stream", "async-trait", + "base64 0.22.1", "futures", "go-flag", "libc", @@ -454,7 +467,7 @@ dependencies = [ [[package]] name = "containerd-shim" version = "0.3.0" -source = "git+https://github.com/kuasar-io/rust-extensions.git#a0fac88f1594ae9694d4cec65b21c57108915c70" +source = "git+https://github.com/kuasar-io/rust-extensions.git#53b4ca86b3461efb22b881b891e41fb45e264c7f" dependencies = [ "async-trait", "cgroups-rs 0.2.11", @@ -465,7 +478,7 @@ dependencies = [ "lazy_static", "libc", "log", - "nix 0.25.1", + "nix 0.28.0", "oci-spec", "page_size", "pin-project-lite", @@ -485,12 +498,12 @@ dependencies = [ [[package]] name = "containerd-shim-protos" version = "0.2.0" -source = "git+https://github.com/kuasar-io/rust-extensions.git#a0fac88f1594ae9694d4cec65b21c57108915c70" +source = "git+https://github.com/kuasar-io/rust-extensions.git#53b4ca86b3461efb22b881b891e41fb45e264c7f" dependencies = [ "async-trait", "protobuf 3.2.0", "ttrpc", - "ttrpc-codegen", + "ttrpc-codegen 0.4.2", ] [[package]] @@ -1207,6 +1220,15 @@ dependencies = [ "autocfg", ] +[[package]] +name = "memoffset" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a" +dependencies = [ + "autocfg", +] + [[package]] name = "mime" version = "0.3.17" @@ -1382,6 +1404,19 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "nix" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4" +dependencies = [ + "bitflags 2.3.3", + "cfg-if 1.0.0", + "cfg_aliases", + "libc", + "memoffset 0.9.1", +] + [[package]] name = "num-traits" version = "0.2.16" @@ -1561,7 +1596,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "059a34f111a9dee2ce1ac2826a68b24601c4298cfeb1a587c3cb493d5ab46f52" dependencies = [ "libc", - "nix 0.26.2", + "nix 0.25.1", ] [[package]] @@ -1848,7 +1883,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b360919a24ea5fc02fa762cb01bd8f43b643fee51c585f763257773b4dc5a9e8" dependencies = [ - "base64", + "base64 0.13.1", "serde", "serde_json", ] @@ -2394,7 +2429,7 @@ dependencies = [ "async-stream", "async-trait", "axum", - "base64", + "base64 0.13.1", "bytes 1.4.0", "futures-core", "futures-util", @@ -2549,6 +2584,17 @@ dependencies = [ "tokio-vsock", ] +[[package]] +name = "ttrpc-codegen" +version = "0.4.1" +source = "git+https://github.com/kuasar-io/ttrpc-rust.git?branch=v0.7.1-kuasar#db83ba89c4e315a680860341080deec0aa400609" +dependencies = [ + "protobuf 2.28.0", + "protobuf-codegen 3.2.0", + "protobuf-support", + "ttrpc-compiler 0.6.1 (git+https://github.com/kuasar-io/ttrpc-rust.git?branch=v0.7.1-kuasar)", +] + [[package]] name = "ttrpc-codegen" version = "0.4.2" @@ -2558,7 +2604,7 @@ dependencies = [ "protobuf 2.28.0", "protobuf-codegen 3.2.0", "protobuf-support", - "ttrpc-compiler", + "ttrpc-compiler 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2576,6 +2622,20 @@ dependencies = [ "tempfile", ] +[[package]] +name = "ttrpc-compiler" +version = "0.6.1" +source = "git+https://github.com/kuasar-io/ttrpc-rust.git?branch=v0.7.1-kuasar#db83ba89c4e315a680860341080deec0aa400609" +dependencies = [ + "derive-new", + "prost 0.8.0", + "prost-build 0.8.0", + "prost-types 0.8.0", + "protobuf 2.28.0", + "protobuf-codegen 2.28.0", + "tempfile", +] + [[package]] name = "unicode-bidi" version = "0.3.14" @@ -2665,7 +2725,7 @@ dependencies = [ "regex", "serde", "ttrpc", - "ttrpc-codegen", + "ttrpc-codegen 0.4.1", ] [[package]] diff --git a/vmm/sandbox/Cargo.toml b/vmm/sandbox/Cargo.toml index d955549b..7c2e269d 100644 --- a/vmm/sandbox/Cargo.toml +++ b/vmm/sandbox/Cargo.toml @@ -8,14 +8,14 @@ edition = "2021" panic = 'abort' [build-dependencies] -built = { version = "0.7.0", features=["cargo-lock", "dependency-tree", "git2", "chrono", "semver"] } +built = { version = "0.7.0", features = ["cargo-lock", "dependency-tree", "git2", "chrono", "semver"] } [dependencies] -built = { version = "0.7.0", features=["cargo-lock", "dependency-tree", "git2", "chrono", "semver"] } -clap = { version="4.4.2", features=["derive"] } +built = { version = "0.7.0", features = ["cargo-lock", "dependency-tree", "git2", "chrono", "semver"] } +clap = { version = "4.4.2", features = ["derive"] } tokio = { version = "1.19.2", features = ["full"] } -containerd-sandbox = {git="https://github.com/kuasar-io/rust-extensions.git"} -containerd-shim = { git="https://github.com/kuasar-io/rust-extensions.git", features=["async"] } +containerd-sandbox = { git = "https://github.com/kuasar-io/rust-extensions.git" } +containerd-shim = { git = "https://github.com/kuasar-io/rust-extensions.git", features = ["async"] } vmm-common = { path = "../common" } bytefmt = "0.1.7" async-trait = "0.1.56" @@ -38,7 +38,7 @@ uuid = { version = "1.1.2", features = ["v4"] } unshare = { version = "0.7.0" } os_pipe = { version = "0.9.2" } qapi = { version = "0.8.0", features = ["qmp", "async-tokio-all"] } -qapi-spec = {version = "0.3.1"} +qapi-spec = { version = "0.3.1" } sandbox-derive = { path = "derive" } api_client = { git = "https://github.com/cloud-hypervisor/cloud-hypervisor.git" } rtnetlink = "0.13.1" @@ -52,17 +52,16 @@ hostname = "0.3" path-clean = "1.0.1" [[bin]] -name = "qemu" +name = "qemu" path = "src/bin/qemu/main.rs" [[bin]] -name = "cloud_hypervisor" +name = "cloud_hypervisor" path = "src/bin/cloud_hypervisor/main.rs" [[bin]] -name = "stratovirt" +name = "stratovirt" path = "src/bin/stratovirt/main.rs" [dev-dependencies] temp-dir = "0.1.11" - diff --git a/vmm/sandbox/src/cloud_hypervisor/hooks.rs b/vmm/sandbox/src/cloud_hypervisor/hooks.rs index 19800cbb..67e1ccaa 100644 --- a/vmm/sandbox/src/cloud_hypervisor/hooks.rs +++ b/vmm/sandbox/src/cloud_hypervisor/hooks.rs @@ -32,7 +32,7 @@ impl Hooks for CloudHypervisorHooks { } async fn post_start(&self, sandbox: &mut KuasarSandbox) -> Result<()> { - sandbox.data.task_address = sandbox.vm.agent_socket.to_string(); + sandbox.data.task_address = format!("ttrpc+{}", sandbox.vm.agent_socket); // sync clock sandbox.sync_clock().await; Ok(()) diff --git a/vmm/sandbox/src/container/handler/io.rs b/vmm/sandbox/src/container/handler/io.rs index 3dae5870..8b974e60 100644 --- a/vmm/sandbox/src/container/handler/io.rs +++ b/vmm/sandbox/src/container/handler/io.rs @@ -90,7 +90,7 @@ pub async fn attach_pipe( sandbox: &mut KuasarSandbox, io_devices: &mut Vec, ) -> Result { - let name = if !path.is_empty() && !path.contains("vsock") { + let name = if !path.is_empty() && !path.contains("://") { let (id, chardev_id) = sandbox.hot_attach_pipe(path).await?; io_devices.push(id); chardev_id diff --git a/vmm/sandbox/src/qemu/hooks.rs b/vmm/sandbox/src/qemu/hooks.rs index 2e53b604..8bb6f9f7 100644 --- a/vmm/sandbox/src/qemu/hooks.rs +++ b/vmm/sandbox/src/qemu/hooks.rs @@ -44,7 +44,7 @@ impl Hooks for QemuHooks { } async fn post_start(&self, sandbox: &mut KuasarSandbox) -> Result<()> { - sandbox.data.task_address = sandbox.vm.agent_socket.to_string(); + sandbox.data.task_address = format!("ttrpc+{}", sandbox.vm.agent_socket); // sync clock sandbox.sync_clock().await; Ok(()) diff --git a/vmm/sandbox/src/sandbox.rs b/vmm/sandbox/src/sandbox.rs index fa514274..ccaf3a1f 100644 --- a/vmm/sandbox/src/sandbox.rs +++ b/vmm/sandbox/src/sandbox.rs @@ -254,6 +254,14 @@ where Ok(()) } + async fn update(&self, id: &str, data: SandboxData) -> Result<()> { + let sandbox_mutex = self.sandbox(id).await?; + let mut sandbox = sandbox_mutex.lock().await; + sandbox.data = data; + sandbox.dump().await?; + Ok(()) + } + async fn sandbox(&self, id: &str) -> Result>> { Ok(self .sandboxes diff --git a/vmm/sandbox/src/stratovirt/hooks.rs b/vmm/sandbox/src/stratovirt/hooks.rs index 983f998a..0eb4e4e9 100644 --- a/vmm/sandbox/src/stratovirt/hooks.rs +++ b/vmm/sandbox/src/stratovirt/hooks.rs @@ -42,7 +42,7 @@ impl Hooks for StratoVirtHooks { } async fn post_start(&self, sandbox: &mut KuasarSandbox) -> Result<()> { - sandbox.data.task_address = sandbox.vm.agent_socket.to_string(); + sandbox.data.task_address = format!("ttrpc+{}", sandbox.vm.agent_socket); // sync clock sandbox.sync_clock().await; Ok(()) diff --git a/vmm/task/Cargo.lock b/vmm/task/Cargo.lock index 9a136500..1b98dada 100644 --- a/vmm/task/Cargo.lock +++ b/vmm/task/Cargo.lock @@ -151,6 +151,12 @@ version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "bitflags" version = "1.3.2" @@ -243,11 +249,12 @@ dependencies = [ [[package]] name = "containerd-sandbox" version = "0.1.0" -source = "git+https://github.com/kuasar-io/rust-extensions.git#b9ad8e197385b72ada6c1b8482c155606d26c803" +source = "git+https://github.com/kuasar-io/rust-extensions.git#53b4ca86b3461efb22b881b891e41fb45e264c7f" dependencies = [ "anyhow", "async-stream", "async-trait", + "base64 0.22.1", "futures", "go-flag", "libc", @@ -271,7 +278,7 @@ dependencies = [ [[package]] name = "containerd-shim" version = "0.3.0" -source = "git+https://github.com/kuasar-io/rust-extensions.git#b9ad8e197385b72ada6c1b8482c155606d26c803" +source = "git+https://github.com/kuasar-io/rust-extensions.git#53b4ca86b3461efb22b881b891e41fb45e264c7f" dependencies = [ "async-trait", "cgroups-rs", @@ -302,12 +309,12 @@ dependencies = [ [[package]] name = "containerd-shim-protos" version = "0.2.0" -source = "git+https://github.com/kuasar-io/rust-extensions.git#b9ad8e197385b72ada6c1b8482c155606d26c803" +source = "git+https://github.com/kuasar-io/rust-extensions.git#53b4ca86b3461efb22b881b891e41fb45e264c7f" dependencies = [ "async-trait", "protobuf 3.2.0", "ttrpc", - "ttrpc-codegen", + "ttrpc-codegen 0.4.2", ] [[package]] @@ -918,15 +925,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "memoffset" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4" -dependencies = [ - "autocfg", -] - [[package]] name = "memoffset" version = "0.8.0" @@ -1116,8 +1114,6 @@ dependencies = [ "bitflags 1.3.2", "cfg-if 1.0.0", "libc", - "memoffset 0.7.1", - "pin-utils", "static_assertions", ] @@ -1319,7 +1315,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "059a34f111a9dee2ce1ac2826a68b24601c4298cfeb1a587c3cb493d5ab46f52" dependencies = [ "libc", - "nix 0.26.2", + "nix 0.28.0", ] [[package]] @@ -1623,7 +1619,7 @@ dependencies = [ [[package]] name = "runc" version = "0.2.0" -source = "git+https://github.com/kuasar-io/rust-extensions.git#b9ad8e197385b72ada6c1b8482c155606d26c803" +source = "git+https://github.com/kuasar-io/rust-extensions.git#53b4ca86b3461efb22b881b891e41fb45e264c7f" dependencies = [ "async-trait", "futures", @@ -1954,7 +1950,7 @@ dependencies = [ "async-stream", "async-trait", "axum", - "base64", + "base64 0.13.1", "bytes 1.4.0", "futures-core", "futures-util", @@ -2093,8 +2089,7 @@ checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" [[package]] name = "ttrpc" version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a35f22a2964bea14afee161665bb260b83cb48e665e0260ca06ec0e775c8b06c" +source = "git+https://github.com/kuasar-io/ttrpc-rust.git?branch=v0.7.1-kuasar#db83ba89c4e315a680860341080deec0aa400609" dependencies = [ "async-trait", "byteorder", @@ -2109,6 +2104,17 @@ dependencies = [ "tokio-vsock", ] +[[package]] +name = "ttrpc-codegen" +version = "0.4.1" +source = "git+https://github.com/kuasar-io/ttrpc-rust.git?branch=v0.7.1-kuasar#db83ba89c4e315a680860341080deec0aa400609" +dependencies = [ + "protobuf 2.28.0", + "protobuf-codegen 3.2.0", + "protobuf-support", + "ttrpc-compiler 0.6.1 (git+https://github.com/kuasar-io/ttrpc-rust.git?branch=v0.7.1-kuasar)", +] + [[package]] name = "ttrpc-codegen" version = "0.4.2" @@ -2118,7 +2124,7 @@ dependencies = [ "protobuf 2.28.0", "protobuf-codegen 3.2.0", "protobuf-support", - "ttrpc-compiler", + "ttrpc-compiler 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2136,6 +2142,20 @@ dependencies = [ "tempfile", ] +[[package]] +name = "ttrpc-compiler" +version = "0.6.1" +source = "git+https://github.com/kuasar-io/ttrpc-rust.git?branch=v0.7.1-kuasar#db83ba89c4e315a680860341080deec0aa400609" +dependencies = [ + "derive-new", + "prost 0.8.0", + "prost-build 0.8.0", + "prost-types 0.8.0", + "protobuf 2.28.0", + "protobuf-codegen 2.28.0", + "tempfile", +] + [[package]] name = "unicode-ident" version = "1.0.9" @@ -2177,7 +2197,7 @@ dependencies = [ "regex", "serde", "ttrpc", - "ttrpc-codegen", + "ttrpc-codegen 0.4.1", ] [[package]] @@ -2200,6 +2220,7 @@ dependencies = [ "nix 0.28.0", "oci-spec", "pin-project-lite", + "protobuf 3.2.0", "rtnetlink", "runc", "serde", diff --git a/vmm/task/Cargo.toml b/vmm/task/Cargo.toml index cc21ba43..e5723838 100644 --- a/vmm/task/Cargo.toml +++ b/vmm/task/Cargo.toml @@ -25,6 +25,7 @@ netlink-packet-route = "0.15" netlink-packet-core = "0.5.0" ipnetwork = "0.20" anyhow = { version = "1.0.66", default-features = false, features = ["std", "backtrace"] } +protobuf = "3.2" # Async dependencies async-trait = { version = "0.1.51" } @@ -37,3 +38,6 @@ ttrpc = { version = "0.7", features = ["async"] } containerd-shim = { git = "https://github.com/kuasar-io/rust-extensions.git", features = ["async"] } runc = { git = "https://github.com/kuasar-io/rust-extensions.git", features = ["async"] } + +[patch.crates-io] +ttrpc = { git = "https://github.com/kuasar-io/ttrpc-rust.git", branch = "v0.7.1-kuasar" } diff --git a/vmm/task/src/container.rs b/vmm/task/src/container.rs index 917034ef..2ca86b49 100644 --- a/vmm/task/src/container.rs +++ b/vmm/task/src/container.rs @@ -311,7 +311,8 @@ impl ProcessFactory for KuasarExecFactory { async fn create(&self, req: &ExecProcessRequest) -> Result { let p = get_spec_from_request(req)?; let stdio = match read_io(&self.bundle, req.id(), Some(req.exec_id())).await { - Ok(io) => Stdio::new(&io.stdin, &io.stdout, &io.stderr, io.terminal), + // terminal is still determined from request + Ok(io) => Stdio::new(&io.stdin, &io.stdout, &io.stderr, req.terminal()), Err(_) => Stdio::new(req.stdin(), req.stdout(), req.stderr(), req.terminal()), }; let stdio = convert_stdio(&stdio).await?; diff --git a/vmm/task/src/io.rs b/vmm/task/src/io.rs index 3fb4a246..19a2c3fe 100644 --- a/vmm/task/src/io.rs +++ b/vmm/task/src/io.rs @@ -49,7 +49,11 @@ use tokio::{ }; use tokio_vsock::{VsockListener, VsockStream}; -use crate::{device::SYSTEM_DEV_PATH, vsock}; +use crate::{ + device::SYSTEM_DEV_PATH, + streaming::{get_output, get_stdin, remove_channel}, + vsock, +}; pub struct ProcessIO { pub io: Option>, @@ -57,6 +61,7 @@ pub struct ProcessIO { } const VSOCK: &str = "vsock"; +const STREAMING: &str = "streaming"; pub fn create_io( id: &str, @@ -102,7 +107,7 @@ pub fn create_io( }; pio.io = Some(Arc::new(io)); pio.copy = false; - } else if scheme.contains(VSOCK) { + } else { let opt = IOOption { open_stdin: !stdio.stdin.is_empty(), open_stdout: !stdio.stdout.is_empty(), @@ -238,7 +243,15 @@ where { let src = from; tokio::spawn(async move { - let dst: Box = if to.contains(VSOCK) { + let dst: Box = if to.contains(STREAMING) { + match get_output(&to).await { + Ok(output) => Box::new(output), + Err(e) => { + error!("failed to get streaming by {}, {}", to, e); + return; + } + } + } else if to.contains(VSOCK) { tokio::select! { _ = exit_signal.wait() => { debug!("container already exited, maybe nobody should connect vsock"); @@ -264,6 +277,9 @@ where } }; copy(src, dst, exit_signal, on_close).await; + if to.contains(STREAMING) { + remove_channel(&to).await.unwrap_or_default(); + } debug!("finished copy io from container to {}", to); }); Ok(()) @@ -281,7 +297,15 @@ where { let dst = to; tokio::spawn(async move { - let src: Box = if from.contains(VSOCK) { + let src: Box = if from.contains(STREAMING) { + match get_stdin(&from).await { + Ok(stdin) => Box::new(stdin), + Err(e) => { + error!("failed to get streaming by {}, {}", from, e); + return; + } + } + } else if from.contains(VSOCK) { tokio::select! { _ = exit_signal.wait() => { debug!("container already exited, maybe nobody should connect"); @@ -307,6 +331,9 @@ where } }; copy(src, dst, exit_signal, on_close).await; + if from.contains(STREAMING) { + remove_channel(&from).await.unwrap_or_default(); + } debug!("finished copy io from {} to container", from); }); Ok(()) diff --git a/vmm/task/src/main.rs b/vmm/task/src/main.rs index 1964738e..d8d9f0fc 100644 --- a/vmm/task/src/main.rs +++ b/vmm/task/src/main.rs @@ -39,11 +39,13 @@ use nix::{ unistd::{fork, getpid, pause, pipe, ForkResult, Pid}, }; use signal_hook_tokio::Signals; +use streaming::STREAMING_SERVICE; use tokio::{fs::File, sync::mpsc::channel}; use vmm_common::{ - api::sandbox_ttrpc::create_sandbox_service, mount::mount, ETC_RESOLV, HOSTNAME_FILENAME, - IPC_NAMESPACE, KUASAR_STATE_DIR, PID_NAMESPACE, RESOLV_FILENAME, SANDBOX_NS_PATH, - UTS_NAMESPACE, + api::{sandbox_ttrpc::create_sandbox_service, streaming_ttrpc::create_streaming}, + mount::mount, + ETC_RESOLV, HOSTNAME_FILENAME, IPC_NAMESPACE, KUASAR_STATE_DIR, PID_NAMESPACE, RESOLV_FILENAME, + SANDBOX_NS_PATH, UTS_NAMESPACE, }; use crate::{ @@ -64,6 +66,7 @@ mod netlink; mod sandbox; mod sandbox_service; mod stream; +mod streaming; mod task; mod util; mod vsock; @@ -374,10 +377,13 @@ async fn create_ttrpc_server() -> anyhow::Result { sandbox.handle_localhost().await?; let sandbox_service = create_sandbox_service(Arc::new(Box::new(sandbox))); + let streaming_service = create_streaming(Arc::new(Box::new(STREAMING_SERVICE.clone()))); + Ok(Server::new() .bind("vsock://-1:1024")? .register_service(task_service) - .register_service(sandbox_service)) + .register_service(sandbox_service) + .register_service(streaming_service)) } async fn setup_sandbox_ns(share_pidns: bool) -> Result<()> { diff --git a/vmm/task/src/streaming.rs b/vmm/task/src/streaming.rs new file mode 100644 index 00000000..0bff6808 --- /dev/null +++ b/vmm/task/src/streaming.rs @@ -0,0 +1,482 @@ +/* +Copyright 2022 The Kuasar Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +use std::{ + collections::HashMap, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +use async_trait::async_trait; +use containerd_shim::protos::protobuf::{CodedInputStream, Message}; +use futures::{ready, Future}; +use lazy_static::lazy_static; +use log::{debug, info, warn}; +use protobuf::MessageFull; +use tokio::{ + io::{AsyncRead, AsyncWrite}, + pin, select, + sync::{ + mpsc::{channel, error::SendError, OwnedPermit, Receiver, Sender}, + Mutex, Notify, + }, +}; +use ttrpc::{asynchronous::ServerStream, r#async::TtrpcContext}; +use vmm_common::{ + api, + api::{ + any::Any, + data::{Data, WindowUpdate}, + empty::Empty, + streaming::StreamInit, + }, +}; + +macro_rules! new_any { + ($ty:ty, $value:expr) => {{ + let mut a = vmm_common::api::any::Any::new(); + a.type_url = <$ty>::descriptor().full_name().to_string(); + a.value = $value; + a + }}; + ($ty:ty) => {{ + let mut a = vmm_common::api::any::Any::new(); + a.type_url = <$ty>::descriptor().full_name().to_string(); + a.value = <$ty>::new().write_to_bytes().unwrap_or_default(); + a + }}; +} + +lazy_static! { + pub static ref STREAMING_SERVICE: Service = Service { + ios: Arc::new(Mutex::new(HashMap::default())) + }; +} + +const WINDOW_SIZE: i32 = 32 * 1024; + +#[derive(Clone)] +pub struct Service { + ios: Arc>>, +} + +pub struct IOChannel { + sender: Option>>, + receiver: Option>>, + remaining_data: Option, + preemption_sender: Option>, + notifier: Arc, +} + +pub struct PreemptableReceiver { + receiver: Receiver>, + preempt: Receiver<()>, +} + +impl PreemptableReceiver { + pub fn new(rx: Receiver>, preempt_rx: Receiver<()>) -> Self { + Self { + receiver: rx, + preempt: preempt_rx, + } + } + + pub async fn recv(&mut self) -> ttrpc::Result>> { + select! { + res = self.receiver.recv() => { + Ok(res) + } + _ = self.preempt.recv() => { + Err(ttrpc::Error::Others("channel is preempted".to_string())) + } + } + } +} + +impl IOChannel { + pub fn new() -> Self { + let (tx, rx) = channel(128); + Self { + sender: Some(tx), + receiver: Some(rx), + remaining_data: None, + preemption_sender: None, + notifier: Arc::new(Notify::new()), + } + } + + async fn get_or_preempt_receiver(&mut self) -> Option { + if let Some(r) = self.receiver.take() { + let (tx, rx) = channel(1); + let preempt_receiver = PreemptableReceiver::new(r, rx); + self.preemption_sender = Some(tx); + return Some(preempt_receiver); + } + if let Some(r) = self.preemption_sender.take() { + debug!("send preemption message"); + if let Err(e) = r.send(()).await { + warn!("failed to send preemption message: {}", e); + } + } + None + } + + fn return_preempted_receiver(&mut self, r: PreemptableReceiver, remaining_data: Option) { + self.receiver = Some(r.receiver); + self.remaining_data = remaining_data; + self.notifier.notify_one(); + } +} + +#[async_trait] +impl api::streaming_ttrpc::Streaming for Service { + async fn stream( + &self, + _ctx: &TtrpcContext, + mut stream: ServerStream, + ) -> ::ttrpc::Result<()> { + let stream_id = if let Some(i) = stream.recv().await? { + let mut stream_init = StreamInit::new(); + let mut input = CodedInputStream::from_bytes(i.value.as_slice()); + stream_init + .merge_from(&mut input) + .map_err(ttrpc::err_to_others!(e, "failed to unmarshal StreamInit"))?; + stream_init.id + } else { + return Err(ttrpc::Error::Others( + "can not receive streamInit".to_string(), + )); + }; + debug!("handle stream with id {}", stream_id); + let a = new_any!(Empty); + stream.send(&a).await?; + + if stream_id.ends_with("stdin") { + self.handle_stdin(&stream_id, stream).await?; + } else if stream_id.ends_with("stdout") || stream_id.ends_with("stderr") { + self.handle_stdout(&stream_id, stream).await?; + } else { + warn!("unrecognized stream {}", stream_id); + } + + debug!("stream with id {} handle finished", stream_id); + Ok(()) + } +} + +impl Service { + async fn get_or_insert_sender(&self, id: &str) -> ttrpc::Result>> { + let mut ios = self.ios.lock().await; + let ch = ios.entry(id.to_string()).or_insert(IOChannel::new()); + ch.sender.take().ok_or(ttrpc::Error::Others( + "someone is taking the channel sender".to_string(), + )) + } + + async fn preempt_receiver(&self, id: &str) -> ttrpc::Result { + for _i in 0..10 { + let mut ios = self.ios.lock().await; + let ch = ios.entry(id.to_string()).or_insert(IOChannel::new()); + let notifier = ch.notifier.clone(); + if let Some(c) = ch.get_or_preempt_receiver().await { + debug!("io channel {} being preempted", id); + return Ok(c); + } + // Release the lock here so that they can get the lock when return_prempted_receiver + drop(ios); + + notifier.notified().await; + } + + Err(ttrpc::Error::Others( + "failed to preempt io channel".to_string(), + )) + } + + async fn return_preempted_receiver( + &self, + id: &str, + r: PreemptableReceiver, + remaining_data: Option, + ) { + let mut ios = self.ios.lock().await; + if let Some(ch) = ios.get_mut(id) { + ch.return_preempted_receiver(r, remaining_data); + } else { + warn!("io channel removed when return the receiver"); + } + } + + async fn get_remaining_data(&self, id: &str) -> Option { + self.ios + .lock() + .await + .get_mut(id) + .and_then(|x| x.remaining_data.take()) + } + + pub async fn get_stdin(&self, id: &str) -> containerd_shim::Result { + self.ios + .lock() + .await + .get_mut(id) + .ok_or(containerd_shim::Error::NotFoundError( + "can not get stdin stream".to_string(), + ))? + .receiver + .take() + .map(|r| StreamingStdin { receiver: r }) + .ok_or(containerd_shim::Error::Other( + "someone is taking the io channel".to_string(), + )) + } + + pub async fn get_output(&self, id: &str) -> containerd_shim::Result { + self.ios + .lock() + .await + .get_mut(id) + .ok_or(containerd_shim::Error::NotFoundError( + "can not get output stream".to_string(), + ))? + .sender + .take() + .map(|s| StreamingOutput { + sender: s, + permit: None, + }) + .ok_or(containerd_shim::Error::Other( + "someone is taking the io channel".to_string(), + )) + } + + async fn remove_io_channel(&self, id: &str) { + self.ios.lock().await.remove(id); + } + + async fn handle_stdin( + &self, + stream_id: &String, + mut stream: ServerStream, + ) -> ttrpc::Result<()> { + let sender = self.get_or_insert_sender(stream_id).await?; + let mut window = 0i32; + loop { + if window < WINDOW_SIZE { + let mut update = WindowUpdate::new(); + update.update = WINDOW_SIZE; + let update_bytes = match update.write_to_bytes() { + Ok(d) => d, + Err(e) => { + debug!("failed to marshal update of stream {}, {}", stream_id, e); + if let Some(c) = self.ios.lock().await.get_mut(stream_id) { + c.sender = Some(sender); + } + return Err(ttrpc::Error::Others(format!("failed to write data {}", e))); + } + }; + let a = new_any!(WindowUpdate, update_bytes); + if let Err(e) = stream.send(&a).await { + debug!("failed to send update of stream {}, {}", stream_id, e); + if let Some(c) = self.ios.lock().await.get_mut(stream_id) { + c.sender = Some(sender); + } + return Err(e); + } + window += WINDOW_SIZE; + } + match stream.recv().await? { + Some(d) => { + let data_bytes = { + let mut data = Data::new(); + let mut input = CodedInputStream::from_bytes(d.value.as_slice()); + data.merge_from(&mut input) + .map_err(ttrpc::err_to_others!(e, "data format error"))?; + data.data + }; + let len: i32 = data_bytes.len().try_into().unwrap_or_default(); + if let Err(e) = sender.send(data_bytes).await { + return Err(ttrpc::Error::Others(format!("failed to send data {}", e))); + } + window -= len; + } + None => { + self.ios.lock().await.remove(stream_id); + return Ok(()); + } + } + } + } + + async fn handle_stdout( + &self, + stream_id: &String, + stream: ServerStream, + ) -> ttrpc::Result<()> { + let mut receiver = self.preempt_receiver(stream_id).await?; + if let Some(a) = self.get_remaining_data(stream_id).await { + if let Err(e) = stream.send(&a).await { + debug!("failed to send data of stream {}, {}", stream_id, e); + self.return_preempted_receiver(stream_id, receiver, Some(a)) + .await; + return Err(e); + } + } + loop { + let r = if let Ok(res) = receiver.recv().await { + res + } else { + self.return_preempted_receiver(stream_id, receiver, None) + .await; + info!("stream {} is preempted", stream_id); + return Err(ttrpc::Error::Others("channel is preempted".to_string())); + }; + match r { + Some(d) => { + if d.is_empty() { + return Ok(()); + } + let mut data = Data::new(); + data.data = d; + let data_bytes = match data.write_to_bytes() { + Ok(b) => b, + Err(e) => { + debug!("failed to marshal data of stream {}, {}", stream_id, e); + self.return_preempted_receiver(stream_id, receiver, None) + .await; + return Err(ttrpc::Error::Others(format!( + "failed to write data {}", + e + ))); + } + }; + let a = new_any!(Data, data_bytes); + match stream.send(&a).await { + Ok(_) => {} + Err(e) => { + debug!("failed to send data of stream {}, {}", stream_id, e); + self.return_preempted_receiver(stream_id, receiver, Some(a)) + .await; + return Err(e); + } + }; + } + None => { + return Ok(()); + } + } + } + } +} + +pub async fn get_stdin(url: &str) -> containerd_shim::Result { + let id = get_id(url)?; + STREAMING_SERVICE.get_stdin(id).await +} + +pub async fn remove_channel(url: &str) -> containerd_shim::Result<()> { + let id = get_id(url)?; + STREAMING_SERVICE.remove_io_channel(id).await; + Ok(()) +} + +pub async fn get_output(url: &str) -> containerd_shim::Result { + let id = get_id(url)?; + STREAMING_SERVICE.get_output(id).await +} + +// url of the streaming should be in the form of +// ttrpc+hvsock://aaa/bbb?id=-stdin +// get_id get the -stdin out of it. +fn get_id(url: &str) -> containerd_shim::Result<&str> { + let id_parts = url.split("id=").collect::>(); + if id_parts.len() != 2 { + return Err(containerd_shim::Error::InvalidArgument( + "streaming url invalid, no id".to_string(), + )); + } + Ok(id_parts[1].trim_matches('&')) +} + +pin_project_lite::pin_project! { + pub struct StreamingStdin { + receiver: Receiver> + } +} + +impl AsyncRead for StreamingStdin { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + let this = self.project(); + let r = ready!(this.receiver.poll_recv(cx)); + match r { + Some(a) => { + buf.put_slice(a.as_slice()); + Poll::Ready(Ok(())) + } + None => Poll::Ready(Ok(())), + } + } +} + +pin_project_lite::pin_project! { + pub struct StreamingOutput { + sender: Sender>, + permit: Option>, SendError<()>>> + Send>>>, + } +} + +impl AsyncWrite for StreamingOutput { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let this = self.project(); + let permit_fut = this.permit.get_or_insert_with(|| { + let permit_fut = this.sender.clone().reserve_owned(); + Box::pin(permit_fut) + }); + pin!(permit_fut); + let permit = ready!(permit_fut.poll(cx)); + match permit { + Ok(p) => { + p.send(buf.to_vec()); + *this.permit = None; + Poll::Ready(Ok(buf.len())) + } + Err(e) => { + *this.permit = None; + Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))) + } + } + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } +}