diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2462616..31637c7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -107,6 +107,8 @@ jobs: - uses: taiki-e/install-action@v2 with: tool: cargo-hack + - name: Install Protoc # for pulsar tests + uses: arduino/setup-protoc@v3 - name: Install the latest Oracle instant client run: | curl -Lo basic.zip https://download.oracle.com/otn_software/linux/instantclient/instantclient-basic-linuxx64.zip diff --git a/Cargo.toml b/Cargo.toml index 8ea9468..e1c2eb2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ victoria_metrics = [] zookeeper = [] cockroach_db = [] kwok = [] +pulsar = [] [dependencies] # TODO: update parse-display after MSRV>=1.80.0 bump of `testcontainer-rs` and `testcontainers-modules` @@ -113,6 +114,7 @@ clickhouse = "0.11.6" vaultrs = "0.7.2" openssl-sys = { version = "0.9.103", features = ["vendored"] } native-tls = { version = "0.2.12", features = ["vendored"] } +pulsar = "6.3" [[example]] name = "postgres" diff --git a/src/lib.rs b/src/lib.rs index 21a1e6a..6ed7e76 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -120,6 +120,10 @@ pub mod parity_parity; #[cfg_attr(docsrs, doc(cfg(feature = "postgres")))] /// **Postgres** (relational database) testcontainer pub mod postgres; +#[cfg(feature = "pulsar")] +#[cfg_attr(docsrs, doc(cfg(feature = "pulsar")))] +/// **Apache Pulsar** (Cloud-Native, Distributed Messaging and Streaming) testcontainer +pub mod pulsar; #[cfg(feature = "rabbitmq")] #[cfg_attr(docsrs, doc(cfg(feature = "rabbitmq")))] /// **rabbitmq** (message broker) testcontainer diff --git a/src/pulsar/mod.rs b/src/pulsar/mod.rs new file mode 100644 index 0000000..bf00dca --- /dev/null +++ b/src/pulsar/mod.rs @@ -0,0 +1,243 @@ +use std::{borrow::Cow, collections::BTreeMap}; + +use testcontainers::{ + core::{CmdWaitFor, ContainerPort, ContainerState, ExecCommand, Mount, WaitFor}, + Image, TestcontainersError, +}; + +const NAME: &str = "apachepulsar/pulsar"; +const TAG: &str = "2.10.6"; + +const PULSAR_PORT: ContainerPort = ContainerPort::Tcp(6650); +const ADMIN_PORT: ContainerPort = ContainerPort::Tcp(8080); + +/// Module to work with [`Apache Pulsar`] inside of tests. +/// **Requires protoc to be installed, otherwise will not build.** +/// +/// This module is based on the official [`Apache Pulsar docker image`]. +/// +/// # Example +/// ``` +/// use testcontainers_modules::{pulsar, testcontainers::runners::SyncRunner}; +/// +/// let pulsar = pulsar::Pulsar::default().start().unwrap(); +/// let http_port = pulsar.get_host_port_ipv4(6650).unwrap(); +/// +/// // do something with the running pulsar instance.. +/// ``` +/// +/// [`Apache Pulsar`]: https://github.com/apache/pulsar +/// [`Apache Pulsar docker image`]: https://hub.docker.com/r/apachepulsar/pulsar/ +#[derive(Debug, Clone)] +pub struct Pulsar { + data_mount: Mount, + env: BTreeMap, + admin_commands: Vec>, +} + +impl Default for Pulsar { + /** + * Creates new standalone pulsar container, with `/pulsar/data` as a temporary volume + */ + fn default() -> Self { + Self { + data_mount: Mount::tmpfs_mount("/pulsar/data"), + env: BTreeMap::new(), + admin_commands: vec![], + } + } +} + +impl Pulsar { + /// Add configuration parameter to Pulsar `conf/standalone.conf` through setting environment variable. + /// + /// Container will rewrite `conf/standalone.conf` file using these variables during startup + /// with help of `bin/apply-config-from-env.py` script + pub fn with_config_env(mut self, name: impl Into, value: impl Into) -> Self { + self.env + .insert(format!("PULSAR_PREFIX_{}", name.into()), value.into()); + self + } + + /// Runs admin command after container start + pub fn with_admin_command( + mut self, + command: impl IntoIterator>, + ) -> Self { + let mut vec: Vec = command.into_iter().map(Into::into).collect(); + vec.insert(0, "bin/pulsar-admin".to_string()); + self.admin_commands.push(vec); + self + } + + /// Creates tenant after container start + pub fn with_tenant(self, tenant: impl Into) -> Self { + let tenant = tenant.into(); + self.with_admin_command(["tenants", "create", &tenant]) + } + + /// Creates namespace after container start + pub fn with_namespace(self, namespace: impl Into) -> Self { + let namespace = namespace.into(); + self.with_admin_command(["namespaces", "create", &namespace]) + } + + /// Creates topic after container start + pub fn with_topic(self, topic: impl Into) -> Self { + let topic = topic.into(); + self.with_admin_command(["topics", "create", &topic]) + } +} + +impl Image for Pulsar { + fn name(&self) -> &str { + NAME + } + + fn tag(&self) -> &str { + TAG + } + + fn ready_conditions(&self) -> Vec { + vec![ + WaitFor::message_on_stdout("HTTP Service started at"), + WaitFor::message_on_stdout("messaging service is ready"), + ] + } + + fn mounts(&self) -> impl IntoIterator { + [&self.data_mount] + } + + fn env_vars( + &self, + ) -> impl IntoIterator>, impl Into>)> { + &self.env + } + + fn cmd(&self) -> impl IntoIterator>> { + [ + "sh", + "-c", + "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone", + ] + } + + fn exec_after_start( + &self, + _cs: ContainerState, + ) -> Result, TestcontainersError> { + Ok(self + .admin_commands + .iter() + .map(|cmd| ExecCommand::new(cmd).with_cmd_ready_condition(CmdWaitFor::exit_code(0))) + .collect()) + } + + fn expose_ports(&self) -> &[ContainerPort] { + &[PULSAR_PORT, ADMIN_PORT] + } +} + +#[cfg(test)] +mod tests { + use futures::StreamExt; + use pulsar::{ + producer::Message, Consumer, DeserializeMessage, Error, Payload, SerializeMessage, + TokioExecutor, + }; + use serde::{Deserialize, Serialize}; + + use super::*; + use crate::testcontainers::runners::AsyncRunner; + + #[derive(Serialize, Deserialize)] + struct TestData { + data: String, + } + + impl DeserializeMessage for TestData { + type Output = Result; + + fn deserialize_message(payload: &Payload) -> Self::Output { + serde_json::from_slice(&payload.data) + } + } + + impl SerializeMessage for TestData { + fn serialize_message(input: Self) -> Result { + Ok(Message { + payload: serde_json::to_vec(&input).map_err(|e| Error::Custom(e.to_string()))?, + ..Default::default() + }) + } + } + + #[tokio::test] + async fn pulsar_subscribe_and_publish() -> Result<(), Box> { + let topic = "persistent://test/test-ns/test-topic"; + + let pulsar = Pulsar::default() + .with_tenant("test") + .with_namespace("test/test-ns") + .with_topic(topic) + .start() + .await + .unwrap(); + + let endpoint = format!( + "pulsar://0.0.0.0:{}", + pulsar.get_host_port_ipv4(6650).await? + ); + let client = pulsar::Pulsar::builder(endpoint, TokioExecutor) + .build() + .await?; + + let mut consumer: Consumer = + client.consumer().with_topic(topic).build().await?; + + let mut producer = client.producer().with_topic(topic).build().await?; + + producer + .send_non_blocking(TestData { + data: "test".to_string(), + }) + .await? + .await?; + + let data = consumer.next().await.unwrap()?.deserialize()?; + assert_eq!("test", data.data); + + Ok(()) + } + + #[tokio::test] + async fn pulsar_config() -> Result<(), Box> { + let topic = "persistent://test/test-ns/test-topic"; + + let pulsar = Pulsar::default() + .with_tenant("test") + .with_namespace("test/test-ns") + .with_config_env("allowAutoTopicCreation", "false") + .start() + .await + .unwrap(); + + let endpoint = format!( + "pulsar://0.0.0.0:{}", + pulsar.get_host_port_ipv4(6650).await? + ); + let client = pulsar::Pulsar::builder(endpoint, TokioExecutor) + .build() + .await?; + + let producer = client.producer().with_topic(topic).build().await; + + match producer { + Ok(_) => panic!("Producer should return error"), + Err(e) => assert_eq!("Connection error: Server error (Some(TopicNotFound)): Topic not found persistent://test/test-ns/test-topic", e.to_string()), + } + + Ok(()) + } +}