Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Pulsar module #233

Merged
merged 8 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ jobs:
- uses: taiki-e/install-action@v2
with:
tool: cargo-hack
- name: Install Protoc # for pulsar tests
uses: arduino/setup-protoc@v3
- name: Install the latest Oracle instant client
run: |
curl -Lo basic.zip https://download.oracle.com/otn_software/linux/instantclient/instantclient-basic-linuxx64.zip
Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ victoria_metrics = []
zookeeper = []
cockroach_db = []
kwok = []
pulsar = []

[dependencies]
# TODO: update parse-display after MSRV>=1.80.0 bump of `testcontainer-rs` and `testcontainers-modules`
Expand Down Expand Up @@ -113,6 +114,7 @@ clickhouse = "0.11.6"
vaultrs = "0.7.2"
openssl-sys = { version = "0.9.103", features = ["vendored"] }
native-tls = { version = "0.2.12", features = ["vendored"] }
pulsar = "6.3"

[[example]]
name = "postgres"
Expand Down
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ pub mod parity_parity;
#[cfg_attr(docsrs, doc(cfg(feature = "postgres")))]
/// **Postgres** (relational database) testcontainer
pub mod postgres;
#[cfg(feature = "pulsar")]
#[cfg_attr(docsrs, doc(cfg(feature = "pulsar")))]
/// **Apache Pulsar** (Cloud-Native, Distributed Messaging and Streaming) testcontainer
pub mod pulsar;
#[cfg(feature = "rabbitmq")]
#[cfg_attr(docsrs, doc(cfg(feature = "rabbitmq")))]
/// **rabbitmq** (message broker) testcontainer
Expand Down
243 changes: 243 additions & 0 deletions src/pulsar/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
use std::{borrow::Cow, collections::BTreeMap};

use testcontainers::{
core::{CmdWaitFor, ContainerPort, ContainerState, ExecCommand, Mount, WaitFor},
Image, TestcontainersError,
};

const NAME: &str = "apachepulsar/pulsar";
const TAG: &str = "2.10.6";

const PULSAR_PORT: ContainerPort = ContainerPort::Tcp(6650);
const ADMIN_PORT: ContainerPort = ContainerPort::Tcp(8080);

/// Module to work with [`Apache Pulsar`] inside of tests.
/// **Requires protoc to be installed, otherwise will not build.**
///
/// This module is based on the official [`Apache Pulsar docker image`].
///
/// # Example
/// ```
/// use testcontainers_modules::{pulsar, testcontainers::runners::SyncRunner};
///
/// let pulsar = pulsar::Pulsar::default().start().unwrap();
/// let http_port = pulsar.get_host_port_ipv4(6650).unwrap();
///
/// // do something with the running pulsar instance..
/// ```
///
/// [`Apache Pulsar`]: https://github.com/apache/pulsar
/// [`Apache Pulsar docker image`]: https://hub.docker.com/r/apachepulsar/pulsar/
#[derive(Debug, Clone)]
pub struct Pulsar {
data_mount: Mount,
env: BTreeMap<String, String>,
admin_commands: Vec<Vec<String>>,
}

impl Default for Pulsar {
/**
* Creates new standalone pulsar container, with `/pulsar/data` as a temporary volume
*/
fn default() -> Self {
Self {
data_mount: Mount::tmpfs_mount("/pulsar/data"),
env: BTreeMap::new(),
admin_commands: vec![],
}
}
}

impl Pulsar {
/// Add configuration parameter to Pulsar `conf/standalone.conf` through setting environment variable.
///
/// Container will rewrite `conf/standalone.conf` file using these variables during startup
/// with help of `bin/apply-config-from-env.py` script
pub fn with_config_env(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
self.env
.insert(format!("PULSAR_PREFIX_{}", name.into()), value.into());
self
}

/// Runs admin command after container start
pub fn with_admin_command(
mut self,
command: impl IntoIterator<Item = impl Into<String>>,
) -> Self {
let mut vec: Vec<String> = command.into_iter().map(Into::into).collect();
vec.insert(0, "bin/pulsar-admin".to_string());
self.admin_commands.push(vec);
self
}

/// Creates tenant after container start
pub fn with_tenant(self, tenant: impl Into<String>) -> Self {
let tenant = tenant.into();
self.with_admin_command(["tenants", "create", &tenant])
}

/// Creates namespace after container start
pub fn with_namespace(self, namespace: impl Into<String>) -> Self {
let namespace = namespace.into();
self.with_admin_command(["namespaces", "create", &namespace])
}

/// Creates topic after container start
pub fn with_topic(self, topic: impl Into<String>) -> Self {
let topic = topic.into();
self.with_admin_command(["topics", "create", &topic])
}
}

impl Image for Pulsar {
fn name(&self) -> &str {
NAME
}

fn tag(&self) -> &str {
TAG
}

fn ready_conditions(&self) -> Vec<WaitFor> {
vec![
WaitFor::message_on_stdout("HTTP Service started at"),
WaitFor::message_on_stdout("messaging service is ready"),
]
}

fn mounts(&self) -> impl IntoIterator<Item = &Mount> {
[&self.data_mount]
}

fn env_vars(
&self,
) -> impl IntoIterator<Item = (impl Into<Cow<'_, str>>, impl Into<Cow<'_, str>>)> {
&self.env
}

fn cmd(&self) -> impl IntoIterator<Item = impl Into<Cow<'_, str>>> {
[
"sh",
"-c",
"bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone",
]
}

fn exec_after_start(
&self,
_cs: ContainerState,
) -> Result<Vec<ExecCommand>, TestcontainersError> {
Ok(self
.admin_commands
.iter()
.map(|cmd| ExecCommand::new(cmd).with_cmd_ready_condition(CmdWaitFor::exit_code(0)))
.collect())
}

fn expose_ports(&self) -> &[ContainerPort] {
&[PULSAR_PORT, ADMIN_PORT]
}
}

#[cfg(test)]
mod tests {
use futures::StreamExt;
use pulsar::{
producer::Message, Consumer, DeserializeMessage, Error, Payload, SerializeMessage,
TokioExecutor,
};
use serde::{Deserialize, Serialize};

use super::*;
use crate::testcontainers::runners::AsyncRunner;

#[derive(Serialize, Deserialize)]
struct TestData {
data: String,
}

impl DeserializeMessage for TestData {
type Output = Result<TestData, serde_json::Error>;

fn deserialize_message(payload: &Payload) -> Self::Output {
serde_json::from_slice(&payload.data)
}
}

impl SerializeMessage for TestData {
fn serialize_message(input: Self) -> Result<Message, Error> {
Ok(Message {
payload: serde_json::to_vec(&input).map_err(|e| Error::Custom(e.to_string()))?,
..Default::default()
})
}
}

#[tokio::test]
async fn pulsar_subscribe_and_publish() -> Result<(), Box<dyn std::error::Error + 'static>> {
let topic = "persistent://test/test-ns/test-topic";

let pulsar = Pulsar::default()
.with_tenant("test")
.with_namespace("test/test-ns")
.with_topic(topic)
.start()
.await
.unwrap();

let endpoint = format!(
"pulsar://0.0.0.0:{}",
pulsar.get_host_port_ipv4(6650).await?
);
let client = pulsar::Pulsar::builder(endpoint, TokioExecutor)
.build()
.await?;

let mut consumer: Consumer<TestData, _> =
client.consumer().with_topic(topic).build().await?;

let mut producer = client.producer().with_topic(topic).build().await?;

producer
.send_non_blocking(TestData {
data: "test".to_string(),
})
.await?
.await?;

let data = consumer.next().await.unwrap()?.deserialize()?;
assert_eq!("test", data.data);

Ok(())
}

#[tokio::test]
async fn pulsar_config() -> Result<(), Box<dyn std::error::Error + 'static>> {
let topic = "persistent://test/test-ns/test-topic";

let pulsar = Pulsar::default()
.with_tenant("test")
.with_namespace("test/test-ns")
.with_config_env("allowAutoTopicCreation", "false")
.start()
.await
.unwrap();

let endpoint = format!(
"pulsar://0.0.0.0:{}",
pulsar.get_host_port_ipv4(6650).await?
);
let client = pulsar::Pulsar::builder(endpoint, TokioExecutor)
.build()
.await?;

let producer = client.producer().with_topic(topic).build().await;

match producer {
Ok(_) => panic!("Producer should return error"),
Err(e) => assert_eq!("Connection error: Server error (Some(TopicNotFound)): Topic not found persistent://test/test-ns/test-topic", e.to_string()),
}

Ok(())
}
}
Loading