From f9bad8e3501d70d60001338d83779265cfc75de1 Mon Sep 17 00:00:00 2001 From: alesharik Date: Mon, 14 Oct 2024 20:49:01 +0300 Subject: [PATCH 1/8] feat: add pulsar module --- Cargo.toml | 2 + src/lib.rs | 4 + src/pulsar/mod.rs | 232 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 238 insertions(+) create mode 100644 src/pulsar/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 8ea9468..e1c2eb2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ victoria_metrics = [] zookeeper = [] cockroach_db = [] kwok = [] +pulsar = [] [dependencies] # TODO: update parse-display after MSRV>=1.80.0 bump of `testcontainer-rs` and `testcontainers-modules` @@ -113,6 +114,7 @@ clickhouse = "0.11.6" vaultrs = "0.7.2" openssl-sys = { version = "0.9.103", features = ["vendored"] } native-tls = { version = "0.2.12", features = ["vendored"] } +pulsar = "6.3" [[example]] name = "postgres" diff --git a/src/lib.rs b/src/lib.rs index 21a1e6a..6ed7e76 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -120,6 +120,10 @@ pub mod parity_parity; #[cfg_attr(docsrs, doc(cfg(feature = "postgres")))] /// **Postgres** (relational database) testcontainer pub mod postgres; +#[cfg(feature = "pulsar")] +#[cfg_attr(docsrs, doc(cfg(feature = "pulsar")))] +/// **Apache Pulsar** (Cloud-Native, Distributed Messaging and Streaming) testcontainer +pub mod pulsar; #[cfg(feature = "rabbitmq")] #[cfg_attr(docsrs, doc(cfg(feature = "rabbitmq")))] /// **rabbitmq** (message broker) testcontainer diff --git a/src/pulsar/mod.rs b/src/pulsar/mod.rs new file mode 100644 index 0000000..cb52ce2 --- /dev/null +++ b/src/pulsar/mod.rs @@ -0,0 +1,232 @@ +use std::{borrow::Cow, collections::BTreeMap}; + +use testcontainers::{ + core::{CmdWaitFor, ContainerPort, ContainerState, ExecCommand, Mount, WaitFor}, + Image, TestcontainersError, +}; + +const NAME: &str = "apachepulsar/pulsar"; +const TAG: &str = "2.10.6"; + +const PULSAR_PORT: ContainerPort = ContainerPort::Tcp(6650); +const ADMIN_PORT: ContainerPort = ContainerPort::Tcp(8080); + +/// Module to work with [`Apache Pulsar`] inside of tests. +/// +/// This module is based on the official [`Apache Pulsar docker image`]. +/// +/// # Example +/// ``` +/// use testcontainers_modules::{pulsar, testcontainers::runners::SyncRunner}; +/// +/// let pulsar = pulsar::Pulsar::default().start().unwrap(); +/// let http_port = pulsar.get_host_port_ipv4(6650).unwrap(); +/// +/// // do something with the running pulsar instance.. +/// ``` +/// +/// [`Apache Pulsar`]: https://github.com/apache/pulsar +/// [`Apache Pulsar docker image`]: https://hub.docker.com/r/apachepulsar/pulsar/ +#[derive(Debug, Clone)] +pub struct Pulsar { + data_mount: Mount, + env: BTreeMap, + admin_commands: Vec>, +} + +impl Default for Pulsar { + /** + * Starts an in-memory instance in dev mode, with horrible token values. + * Obviously not to be emulated in production. + */ + fn default() -> Self { + Self { + data_mount: Mount::tmpfs_mount("/pulsar/data"), + env: BTreeMap::new(), + admin_commands: vec![], + } + } +} + +impl Pulsar { + fn with_config(mut self, name: impl Into, value: impl Into) -> Self { + self.env + .insert(format!("PULSAR_PREFIX_{}", name.into()), value.into()); + self + } + + fn with_admin_command(mut self, command: impl IntoIterator>) -> Self { + let mut vec: Vec = command.into_iter().map(Into::into).collect(); + vec.insert(0, "bin/pulsar-admin".to_string()); + self.admin_commands.push(vec); + self + } + + fn with_tenant(self, tenant: impl Into) -> Self { + let tenant = tenant.into(); + self.with_admin_command(["tenants", "create", &tenant]) + } + + fn with_namespace(self, namespace: impl Into) -> Self { + let namespace = namespace.into(); + self.with_admin_command(["namespaces", "create", &namespace]) + } + + fn with_topic(self, topic: impl Into) -> Self { + let topic = topic.into(); + self.with_admin_command(["topics", "create", &topic]) + } +} + +impl Image for Pulsar { + fn name(&self) -> &str { + NAME + } + + fn tag(&self) -> &str { + TAG + } + + fn ready_conditions(&self) -> Vec { + vec![ + WaitFor::message_on_stdout("HTTP Service started at"), + WaitFor::message_on_stdout("messaging service is ready"), + ] + } + + fn mounts(&self) -> impl IntoIterator { + [&self.data_mount] + } + + fn env_vars( + &self, + ) -> impl IntoIterator>, impl Into>)> { + &self.env + } + + fn cmd(&self) -> impl IntoIterator>> { + [ + "sh", + "-c", + "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone", + ] + } + + fn exec_after_start( + &self, + _cs: ContainerState, + ) -> Result, TestcontainersError> { + Ok(self + .admin_commands + .iter() + .map(|cmd| ExecCommand::new(cmd).with_cmd_ready_condition(CmdWaitFor::exit_code(0))) + .collect()) + } + + fn expose_ports(&self) -> &[ContainerPort] { + &[PULSAR_PORT, ADMIN_PORT] + } +} + +#[cfg(test)] +mod tests { + use futures::StreamExt; + use pulsar::{ + producer::Message, Consumer, DeserializeMessage, Error, Payload, SerializeMessage, + TokioExecutor, + }; + use serde::{Deserialize, Serialize}; + + use super::*; + use crate::testcontainers::runners::AsyncRunner; + + #[derive(Serialize, Deserialize)] + struct TestData { + data: String, + } + + impl DeserializeMessage for TestData { + type Output = Result; + + fn deserialize_message(payload: &Payload) -> Self::Output { + serde_json::from_slice(&payload.data) + } + } + + impl SerializeMessage for TestData { + fn serialize_message(input: Self) -> Result { + Ok(Message { + payload: serde_json::to_vec(&input).map_err(|e| Error::Custom(e.to_string()))?, + ..Default::default() + }) + } + } + + #[tokio::test] + async fn pulsar_subscribe_and_publish() -> Result<(), Box> { + let topic = "persistent://test/test-ns/test-topic"; + + let pulsar = Pulsar::default() + .with_tenant("test") + .with_namespace("test/test-ns") + .with_topic(topic) + .start() + .await + .unwrap(); + + let endpoint = format!( + "pulsar://0.0.0.0:{}", + pulsar.get_host_port_ipv4(6650).await? + ); + let client = pulsar::Pulsar::builder(endpoint, TokioExecutor) + .build() + .await?; + + let mut consumer: Consumer = + client.consumer().with_topic(topic).build().await?; + + let mut producer = client.producer().with_topic(topic).build().await?; + + producer + .send_non_blocking(TestData { + data: "test".to_string(), + }) + .await? + .await?; + + let data = consumer.next().await.unwrap()?.deserialize()?; + assert_eq!("test", data.data); + + Ok(()) + } + + #[tokio::test] + async fn pulsar_config() -> Result<(), Box> { + let topic = "persistent://test/test-ns/test-topic"; + + let pulsar = Pulsar::default() + .with_tenant("test") + .with_namespace("test/test-ns") + .with_config("allowAutoTopicCreation", "false") + .start() + .await + .unwrap(); + + let endpoint = format!( + "pulsar://0.0.0.0:{}", + pulsar.get_host_port_ipv4(6650).await? + ); + let client = pulsar::Pulsar::builder(endpoint, TokioExecutor) + .build() + .await?; + + let producer = client.producer().with_topic(topic).build().await; + + match producer { + Ok(_) => panic!("Producer should return error"), + Err(e) => assert_eq!("Connection error: Server error (Some(TopicNotFound)): Topic not found persistent://test/test-ns/test-topic", e.to_string()), + } + + Ok(()) + } +} From 58dab26794b14e8443963b8510b27e27650e631c Mon Sep 17 00:00:00 2001 From: alesharik Date: Tue, 15 Oct 2024 10:45:49 +0300 Subject: [PATCH 2/8] fix: clippy warnings --- src/pulsar/mod.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/pulsar/mod.rs b/src/pulsar/mod.rs index cb52ce2..bb7fed7 100644 --- a/src/pulsar/mod.rs +++ b/src/pulsar/mod.rs @@ -49,30 +49,35 @@ impl Default for Pulsar { } impl Pulsar { - fn with_config(mut self, name: impl Into, value: impl Into) -> Self { + /// Add configuration parameter to Pulsar `conf/standalone.conf` + pub fn with_config(mut self, name: impl Into, value: impl Into) -> Self { self.env .insert(format!("PULSAR_PREFIX_{}", name.into()), value.into()); self } - fn with_admin_command(mut self, command: impl IntoIterator>) -> Self { + /// Runs admin command after container start + pub fn with_admin_command(mut self, command: impl IntoIterator>) -> Self { let mut vec: Vec = command.into_iter().map(Into::into).collect(); vec.insert(0, "bin/pulsar-admin".to_string()); self.admin_commands.push(vec); self } - fn with_tenant(self, tenant: impl Into) -> Self { + /// Creates tenant after container start + pub fn with_tenant(self, tenant: impl Into) -> Self { let tenant = tenant.into(); self.with_admin_command(["tenants", "create", &tenant]) } - fn with_namespace(self, namespace: impl Into) -> Self { + /// Creates namespace after container start + pub fn with_namespace(self, namespace: impl Into) -> Self { let namespace = namespace.into(); self.with_admin_command(["namespaces", "create", &namespace]) } - fn with_topic(self, topic: impl Into) -> Self { + /// Creates topic after container start + pub fn with_topic(self, topic: impl Into) -> Self { let topic = topic.into(); self.with_admin_command(["topics", "create", &topic]) } From e542043048a22cbec48aa8772e3d048d1a7e0772 Mon Sep 17 00:00:00 2001 From: alesharik Date: Tue, 15 Oct 2024 10:50:37 +0300 Subject: [PATCH 3/8] ci: install protoc for pulsar tests --- .github/workflows/ci.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2462616..31637c7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -107,6 +107,8 @@ jobs: - uses: taiki-e/install-action@v2 with: tool: cargo-hack + - name: Install Protoc # for pulsar tests + uses: arduino/setup-protoc@v3 - name: Install the latest Oracle instant client run: | curl -Lo basic.zip https://download.oracle.com/otn_software/linux/instantclient/instantclient-basic-linuxx64.zip From 3bd381756ee6ca98a014215bbb1f0dbbf7a80ed8 Mon Sep 17 00:00:00 2001 From: alesharik Date: Tue, 15 Oct 2024 10:50:58 +0300 Subject: [PATCH 4/8] docs: add warning to pulsar module that it requires `protoc` compiler to build --- src/pulsar/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/pulsar/mod.rs b/src/pulsar/mod.rs index bb7fed7..c98cd4a 100644 --- a/src/pulsar/mod.rs +++ b/src/pulsar/mod.rs @@ -12,6 +12,7 @@ const PULSAR_PORT: ContainerPort = ContainerPort::Tcp(6650); const ADMIN_PORT: ContainerPort = ContainerPort::Tcp(8080); /// Module to work with [`Apache Pulsar`] inside of tests. +/// **Requires protoc to be installed, otherwise will not build.** /// /// This module is based on the official [`Apache Pulsar docker image`]. /// From 566660fef6078860e67006af4afcd95bb3e32f3f Mon Sep 17 00:00:00 2001 From: alesharik Date: Tue, 15 Oct 2024 18:08:56 +0300 Subject: [PATCH 5/8] docs: set correct docs for `Pulsar::default()` --- src/pulsar/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/pulsar/mod.rs b/src/pulsar/mod.rs index c98cd4a..b0b2661 100644 --- a/src/pulsar/mod.rs +++ b/src/pulsar/mod.rs @@ -37,8 +37,7 @@ pub struct Pulsar { impl Default for Pulsar { /** - * Starts an in-memory instance in dev mode, with horrible token values. - * Obviously not to be emulated in production. + * Creates new standalone pulsar container, with `/pulsar/data` as temporary volume */ fn default() -> Self { Self { From 826040f8d873883905288852d6f804336ce9a89a Mon Sep 17 00:00:00 2001 From: alesharik Date: Tue, 15 Oct 2024 18:14:49 +0300 Subject: [PATCH 6/8] refactor: Pulsar::with_config -> with_config_env --- src/pulsar/mod.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/pulsar/mod.rs b/src/pulsar/mod.rs index b0b2661..1b50d76 100644 --- a/src/pulsar/mod.rs +++ b/src/pulsar/mod.rs @@ -49,8 +49,11 @@ impl Default for Pulsar { } impl Pulsar { - /// Add configuration parameter to Pulsar `conf/standalone.conf` - pub fn with_config(mut self, name: impl Into, value: impl Into) -> Self { + /// Add configuration parameter to Pulsar `conf/standalone.conf` through setting environment variable. + /// + /// Container will rewrite `conf/standalone.conf` file using these variables during startup + /// with help of `bin/apply-config-from-env.py` script + pub fn with_config_env(mut self, name: impl Into, value: impl Into) -> Self { self.env .insert(format!("PULSAR_PREFIX_{}", name.into()), value.into()); self @@ -212,7 +215,7 @@ mod tests { let pulsar = Pulsar::default() .with_tenant("test") .with_namespace("test/test-ns") - .with_config("allowAutoTopicCreation", "false") + .with_config_env("allowAutoTopicCreation", "false") .start() .await .unwrap(); From 68a18f00fab328f4f4eacd058dc2c446ab3a36b2 Mon Sep 17 00:00:00 2001 From: alesharik Date: Tue, 15 Oct 2024 18:17:01 +0300 Subject: [PATCH 7/8] style: fix formatting issues in Pulsar module --- src/pulsar/mod.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/pulsar/mod.rs b/src/pulsar/mod.rs index 1b50d76..d0e1614 100644 --- a/src/pulsar/mod.rs +++ b/src/pulsar/mod.rs @@ -60,7 +60,10 @@ impl Pulsar { } /// Runs admin command after container start - pub fn with_admin_command(mut self, command: impl IntoIterator>) -> Self { + pub fn with_admin_command( + mut self, + command: impl IntoIterator>, + ) -> Self { let mut vec: Vec = command.into_iter().map(Into::into).collect(); vec.insert(0, "bin/pulsar-admin".to_string()); self.admin_commands.push(vec); From 718e022b484fc274d29c6d2cc4300c2253d363a2 Mon Sep 17 00:00:00 2001 From: alesharik Date: Tue, 15 Oct 2024 18:18:19 +0300 Subject: [PATCH 8/8] docs: fix typo in Pulsar::default() --- src/pulsar/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pulsar/mod.rs b/src/pulsar/mod.rs index d0e1614..bf00dca 100644 --- a/src/pulsar/mod.rs +++ b/src/pulsar/mod.rs @@ -37,7 +37,7 @@ pub struct Pulsar { impl Default for Pulsar { /** - * Creates new standalone pulsar container, with `/pulsar/data` as temporary volume + * Creates new standalone pulsar container, with `/pulsar/data` as a temporary volume */ fn default() -> Self { Self {