diff --git a/Cargo.toml b/Cargo.toml index baaf61a..9eafd8b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -116,3 +116,7 @@ required-features = ["mssql_server"] [[example]] name = "surrealdb" required-features = ["surrealdb"] + +[[example]] +name = "mongo" +required-features = ["mongo"] diff --git a/src/kafka/apache.rs b/src/kafka/apache.rs new file mode 100644 index 0000000..cc3871c --- /dev/null +++ b/src/kafka/apache.rs @@ -0,0 +1,314 @@ +use std::{borrow::Cow, collections::HashMap}; +use testcontainers::{ + core::{ContainerPort, ContainerState, ExecCommand, WaitFor}, + Image, +}; + +const KAFKA_NATIVE_IMAGE_NAME: &str = "apache/kafka-native"; +const KAFKA_IMAGE_NAME: &str = "apache/kafka"; +const TAG: &str = "latest"; + +pub const KAFKA_PORT: u16 = 9092; + +const START_SCRIPT: &str = "/opt/kafka/testcontainers_start.sh"; +const DEFAULT_INTERNAL_TOPIC_RF: usize = 1; +const DEFAULT_CLUSTER_ID: &str = "5L6g3nShT-eMCtK--X86sw"; +const DEFAULT_BROKER_ID: usize = 1; + +/// Module to work with [`Apache Kafka`] broker +/// +/// Starts an instance of Apache Kafka broker, with Apache Kafka Raft (KRaft) is the consensus protocol +/// enabled. +/// +/// This module is based on the official [`Apache Kafka docker image`](https://hub.docker.com/r/apache/kafka) +/// +/// Module comes in two flavours: +/// +/// - [`Apache Kafka GraalVM docker image`](https://hub.docker.com/r/apache/kafka), which is default as it provides faster startup and lower memory consumption. +/// - [`Apache Kafka JVM docker image`](https://hub.docker.com/r/apache/kafka) +/// +/// # Example +/// ``` +/// use testcontainers_modules::{kafka::apache, testcontainers::runners::SyncRunner}; +/// let kafka_node = apache::Kafka::default().start().unwrap(); +/// +/// let bootstrap_servers = format!("127.0.0.1:{}", kafka_node.get_host_port_ipv4(apache::KAFKA_PORT).unwrap()); +/// +/// // connect to kafka server to send/receive messages +/// ``` + +#[derive(Debug, Clone)] +pub struct Kafka { + env_vars: HashMap, + image_name: String, +} + +impl Default for Kafka { + fn default() -> Self { + let mut env_vars = HashMap::new(); + + env_vars.insert( + "KAFKA_LISTENERS".to_owned(), + format!( + "PLAINTEXT://0.0.0.0:{KAFKA_PORT},BROKER://0.0.0.0:9093,CONTROLLER://0.0.0.0:9094" + ), + ); + env_vars.insert("CLUSTER_ID".to_owned(), DEFAULT_CLUSTER_ID.to_owned()); + env_vars.insert( + "KAFKA_PROCESS_ROLES".to_owned(), + "broker,controller".to_owned(), + ); + + env_vars.insert( + "KAFKA_CONTROLLER_LISTENER_NAMES".to_owned(), + "CONTROLLER".to_owned(), + ); + env_vars.insert( + "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP".to_owned(), + "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT".to_owned(), + ); + env_vars.insert( + "KAFKA_INTER_BROKER_LISTENER_NAME".to_owned(), + "BROKER".to_owned(), + ); + env_vars.insert( + "KAFKA_ADVERTISED_LISTENERS".to_owned(), + format!("PLAINTEXT://localhost:{KAFKA_PORT},BROKER://localhost:9092",), + ); + env_vars.insert("KAFKA_BROKER_ID".to_owned(), DEFAULT_BROKER_ID.to_string()); + env_vars.insert( + "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR".to_owned(), + DEFAULT_INTERNAL_TOPIC_RF.to_string(), + ); + env_vars.insert( + "KAFKA_CONTROLLER_QUORUM_VOTERS".to_owned(), + format!("{DEFAULT_BROKER_ID}@localhost:9094").to_owned(), + ); + + Self { + env_vars, + image_name: KAFKA_NATIVE_IMAGE_NAME.to_string(), + } + } +} + +impl Kafka { + /// Switches default image to `apache/kafka` instead of `apache/kafka-native` + pub fn with_jvm_image(mut self) -> Self { + self.image_name = KAFKA_IMAGE_NAME.to_string(); + + self + } +} + +impl Image for Kafka { + fn name(&self) -> &str { + self.image_name.as_str() + } + + fn tag(&self) -> &str { + TAG + } + + fn ready_conditions(&self) -> Vec { + vec![] + } + + fn entrypoint(&self) -> Option<&str> { + Some("bash") + } + + fn env_vars( + &self, + ) -> impl IntoIterator>, impl Into>)> { + &self.env_vars + } + + fn cmd(&self) -> impl IntoIterator>> { + vec![ + "-c".to_string(), + format!("while [ ! -f {START_SCRIPT} ]; do sleep 0.1; done; pwd &&chmod 755 {START_SCRIPT} && {START_SCRIPT}"), + ] + .into_iter() + } + + fn expose_ports(&self) -> &[ContainerPort] { + &[ContainerPort::Tcp(KAFKA_PORT)] + } + + fn exec_after_start( + &self, + cs: ContainerState, + ) -> Result, testcontainers::TestcontainersError> { + let mut commands = vec![]; + + let cmd = vec![ + "sh".to_string(), + "-c".to_string(), + format!( + "echo '#!/usr/bin/env bash\nexport KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:{},BROKER://localhost:9093\n/etc/kafka/docker/run \n' > {}", + cs.host_port_ipv4(ContainerPort::Tcp(KAFKA_PORT))?, + START_SCRIPT + ), + ]; + let ready_conditions = vec![WaitFor::message_on_stdout("Kafka Server started")]; + commands.push(ExecCommand::new(cmd).with_container_ready_conditions(ready_conditions)); + + Ok(commands) + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use futures::StreamExt; + use rdkafka::{ + consumer::{Consumer, StreamConsumer}, + producer::{FutureProducer, FutureRecord}, + ClientConfig, Message, + }; + use testcontainers::runners::AsyncRunner; + + use crate::kafka::apache; + + #[tokio::test] + async fn produce_and_consume_messages_graalvm( + ) -> Result<(), Box> { + let _ = pretty_env_logger::try_init(); + let kafka_node = apache::Kafka::default().start().await?; + + let bootstrap_servers = format!( + "127.0.0.1:{}", + kafka_node.get_host_port_ipv4(apache::KAFKA_PORT).await? + ); + + let producer = ClientConfig::new() + .set("bootstrap.servers", &bootstrap_servers) + .set("message.timeout.ms", "5000") + .create::() + .expect("Failed to create Kafka FutureProducer"); + + let consumer = ClientConfig::new() + .set("group.id", "testcontainer-rs") + .set("bootstrap.servers", &bootstrap_servers) + .set("session.timeout.ms", "6000") + .set("enable.auto.commit", "false") + .set("auto.offset.reset", "earliest") + .create::() + .expect("Failed to create Kafka StreamConsumer"); + + let topic = "test-topic"; + + let number_of_messages_to_produce = 5_usize; + let expected: Vec = (0..number_of_messages_to_produce) + .map(|i| format!("Message {i}")) + .collect(); + + for (i, message) in expected.iter().enumerate() { + producer + .send( + FutureRecord::to(topic) + .payload(message) + .key(&format!("Key {i}")), + Duration::from_secs(0), + ) + .await + .unwrap(); + } + + consumer + .subscribe(&[topic]) + .expect("Failed to subscribe to a topic"); + + let mut message_stream = consumer.stream(); + for produced in expected { + let borrowed_message = + tokio::time::timeout(Duration::from_secs(10), message_stream.next()) + .await + .unwrap() + .unwrap(); + + assert_eq!( + produced, + borrowed_message + .unwrap() + .payload_view::() + .unwrap() + .unwrap() + ); + } + + Ok(()) + } + + #[tokio::test] + async fn produce_and_consume_messages_jvm() -> Result<(), Box> + { + let _ = pretty_env_logger::try_init(); + let kafka_node = apache::Kafka::default().with_jvm_image().start().await?; + + let bootstrap_servers = format!( + "127.0.0.1:{}", + kafka_node.get_host_port_ipv4(apache::KAFKA_PORT).await? + ); + + let producer = ClientConfig::new() + .set("bootstrap.servers", &bootstrap_servers) + .set("message.timeout.ms", "5000") + .create::() + .expect("Failed to create Kafka FutureProducer"); + + let consumer = ClientConfig::new() + .set("group.id", "testcontainer-rs") + .set("bootstrap.servers", &bootstrap_servers) + .set("session.timeout.ms", "6000") + .set("enable.auto.commit", "false") + .set("auto.offset.reset", "earliest") + .create::() + .expect("Failed to create Kafka StreamConsumer"); + + let topic = "test-topic"; + + let number_of_messages_to_produce = 5_usize; + let expected: Vec = (0..number_of_messages_to_produce) + .map(|i| format!("Message {i}")) + .collect(); + + for (i, message) in expected.iter().enumerate() { + producer + .send( + FutureRecord::to(topic) + .payload(message) + .key(&format!("Key {i}")), + Duration::from_secs(0), + ) + .await + .unwrap(); + } + + consumer + .subscribe(&[topic]) + .expect("Failed to subscribe to a topic"); + + let mut message_stream = consumer.stream(); + for produced in expected { + let borrowed_message = + tokio::time::timeout(Duration::from_secs(10), message_stream.next()) + .await + .unwrap() + .unwrap(); + + assert_eq!( + produced, + borrowed_message + .unwrap() + .payload_view::() + .unwrap() + .unwrap() + ); + } + + Ok(()) + } +} diff --git a/src/kafka/confluent.rs b/src/kafka/confluent.rs new file mode 100644 index 0000000..c0d8dfc --- /dev/null +++ b/src/kafka/confluent.rs @@ -0,0 +1,203 @@ +use std::{borrow::Cow, collections::HashMap}; + +use testcontainers::{ + core::{ContainerPort, ContainerState, ExecCommand, WaitFor}, + Image, +}; + +const NAME: &str = "confluentinc/cp-kafka"; +const TAG: &str = "6.1.1"; + +pub const KAFKA_PORT: u16 = 9093; +const ZOOKEEPER_PORT: u16 = 2181; + +#[derive(Debug, Clone)] +pub struct Kafka { + env_vars: HashMap, +} + +impl Default for Kafka { + fn default() -> Self { + let mut env_vars = HashMap::new(); + + env_vars.insert( + "KAFKA_ZOOKEEPER_CONNECT".to_owned(), + format!("localhost:{ZOOKEEPER_PORT}"), + ); + env_vars.insert( + "KAFKA_LISTENERS".to_owned(), + format!("PLAINTEXT://0.0.0.0:{KAFKA_PORT},BROKER://0.0.0.0:9092"), + ); + env_vars.insert( + "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP".to_owned(), + "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT".to_owned(), + ); + env_vars.insert( + "KAFKA_INTER_BROKER_LISTENER_NAME".to_owned(), + "BROKER".to_owned(), + ); + env_vars.insert( + "KAFKA_ADVERTISED_LISTENERS".to_owned(), + format!("PLAINTEXT://localhost:{KAFKA_PORT},BROKER://localhost:9092",), + ); + env_vars.insert("KAFKA_BROKER_ID".to_owned(), "1".to_owned()); + env_vars.insert( + "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR".to_owned(), + "1".to_owned(), + ); + + Self { env_vars } + } +} + +impl Image for Kafka { + fn name(&self) -> &str { + NAME + } + + fn tag(&self) -> &str { + TAG + } + + fn ready_conditions(&self) -> Vec { + vec![WaitFor::message_on_stdout("Creating new log file")] + } + + fn env_vars( + &self, + ) -> impl IntoIterator>, impl Into>)> { + &self.env_vars + } + + fn cmd(&self) -> impl IntoIterator>> { + vec![ + "/bin/bash".to_owned(), + "-c".to_owned(), + format!( + r#" +echo 'clientPort={ZOOKEEPER_PORT}' > zookeeper.properties; +echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties; +echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties; +zookeeper-server-start zookeeper.properties & +. /etc/confluent/docker/bash-config && +/etc/confluent/docker/configure && +/etc/confluent/docker/launch"#, + ), + ] + } + + fn expose_ports(&self) -> &[ContainerPort] { + &[ContainerPort::Tcp(KAFKA_PORT)] + } + + fn exec_after_start( + &self, + cs: ContainerState, + ) -> Result, testcontainers::TestcontainersError> { + let mut commands = vec![]; + let cmd = vec![ + "kafka-configs".to_string(), + "--alter".to_string(), + "--bootstrap-server".to_string(), + "0.0.0.0:9092".to_string(), + "--entity-type".to_string(), + "brokers".to_string(), + "--entity-name".to_string(), + "1".to_string(), + "--add-config".to_string(), + format!( + "advertised.listeners=[PLAINTEXT://127.0.0.1:{},BROKER://localhost:9092]", + cs.host_port_ipv4(ContainerPort::Tcp(KAFKA_PORT))? + ), + ]; + let ready_conditions = vec![WaitFor::message_on_stdout( + "Checking need to trigger auto leader balancing", + )]; + commands.push(ExecCommand::new(cmd).with_container_ready_conditions(ready_conditions)); + Ok(commands) + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use futures::StreamExt; + use rdkafka::{ + consumer::{Consumer, StreamConsumer}, + producer::{FutureProducer, FutureRecord}, + ClientConfig, Message, + }; + use testcontainers::runners::AsyncRunner; + + use crate::kafka; + + #[tokio::test] + async fn produce_and_consume_messages() -> Result<(), Box> { + let _ = pretty_env_logger::try_init(); + let kafka_node = kafka::Kafka::default().start().await?; + + let bootstrap_servers = format!( + "127.0.0.1:{}", + kafka_node.get_host_port_ipv4(kafka::KAFKA_PORT).await? + ); + + let producer = ClientConfig::new() + .set("bootstrap.servers", &bootstrap_servers) + .set("message.timeout.ms", "5000") + .create::() + .expect("Failed to create Kafka FutureProducer"); + + let consumer = ClientConfig::new() + .set("group.id", "testcontainer-rs") + .set("bootstrap.servers", &bootstrap_servers) + .set("session.timeout.ms", "6000") + .set("enable.auto.commit", "false") + .set("auto.offset.reset", "earliest") + .create::() + .expect("Failed to create Kafka StreamConsumer"); + + let topic = "test-topic"; + + let number_of_messages_to_produce = 5_usize; + let expected: Vec = (0..number_of_messages_to_produce) + .map(|i| format!("Message {i}")) + .collect(); + + for (i, message) in expected.iter().enumerate() { + producer + .send( + FutureRecord::to(topic) + .payload(message) + .key(&format!("Key {i}")), + Duration::from_secs(0), + ) + .await + .unwrap(); + } + + consumer + .subscribe(&[topic]) + .expect("Failed to subscribe to a topic"); + + let mut message_stream = consumer.stream(); + for produced in expected { + let borrowed_message = + tokio::time::timeout(Duration::from_secs(10), message_stream.next()) + .await + .unwrap() + .unwrap(); + + assert_eq!( + produced, + borrowed_message + .unwrap() + .payload_view::() + .unwrap() + .unwrap() + ); + } + + Ok(()) + } +} diff --git a/src/kafka/mod.rs b/src/kafka/mod.rs index c0d8dfc..20875ec 100644 --- a/src/kafka/mod.rs +++ b/src/kafka/mod.rs @@ -1,203 +1,6 @@ -use std::{borrow::Cow, collections::HashMap}; +/// Test container based on Apache Kafka Image +pub mod apache; +/// Test container based on Confluent Kafka Image +pub mod confluent; -use testcontainers::{ - core::{ContainerPort, ContainerState, ExecCommand, WaitFor}, - Image, -}; - -const NAME: &str = "confluentinc/cp-kafka"; -const TAG: &str = "6.1.1"; - -pub const KAFKA_PORT: u16 = 9093; -const ZOOKEEPER_PORT: u16 = 2181; - -#[derive(Debug, Clone)] -pub struct Kafka { - env_vars: HashMap, -} - -impl Default for Kafka { - fn default() -> Self { - let mut env_vars = HashMap::new(); - - env_vars.insert( - "KAFKA_ZOOKEEPER_CONNECT".to_owned(), - format!("localhost:{ZOOKEEPER_PORT}"), - ); - env_vars.insert( - "KAFKA_LISTENERS".to_owned(), - format!("PLAINTEXT://0.0.0.0:{KAFKA_PORT},BROKER://0.0.0.0:9092"), - ); - env_vars.insert( - "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP".to_owned(), - "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT".to_owned(), - ); - env_vars.insert( - "KAFKA_INTER_BROKER_LISTENER_NAME".to_owned(), - "BROKER".to_owned(), - ); - env_vars.insert( - "KAFKA_ADVERTISED_LISTENERS".to_owned(), - format!("PLAINTEXT://localhost:{KAFKA_PORT},BROKER://localhost:9092",), - ); - env_vars.insert("KAFKA_BROKER_ID".to_owned(), "1".to_owned()); - env_vars.insert( - "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR".to_owned(), - "1".to_owned(), - ); - - Self { env_vars } - } -} - -impl Image for Kafka { - fn name(&self) -> &str { - NAME - } - - fn tag(&self) -> &str { - TAG - } - - fn ready_conditions(&self) -> Vec { - vec![WaitFor::message_on_stdout("Creating new log file")] - } - - fn env_vars( - &self, - ) -> impl IntoIterator>, impl Into>)> { - &self.env_vars - } - - fn cmd(&self) -> impl IntoIterator>> { - vec![ - "/bin/bash".to_owned(), - "-c".to_owned(), - format!( - r#" -echo 'clientPort={ZOOKEEPER_PORT}' > zookeeper.properties; -echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties; -echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties; -zookeeper-server-start zookeeper.properties & -. /etc/confluent/docker/bash-config && -/etc/confluent/docker/configure && -/etc/confluent/docker/launch"#, - ), - ] - } - - fn expose_ports(&self) -> &[ContainerPort] { - &[ContainerPort::Tcp(KAFKA_PORT)] - } - - fn exec_after_start( - &self, - cs: ContainerState, - ) -> Result, testcontainers::TestcontainersError> { - let mut commands = vec![]; - let cmd = vec![ - "kafka-configs".to_string(), - "--alter".to_string(), - "--bootstrap-server".to_string(), - "0.0.0.0:9092".to_string(), - "--entity-type".to_string(), - "brokers".to_string(), - "--entity-name".to_string(), - "1".to_string(), - "--add-config".to_string(), - format!( - "advertised.listeners=[PLAINTEXT://127.0.0.1:{},BROKER://localhost:9092]", - cs.host_port_ipv4(ContainerPort::Tcp(KAFKA_PORT))? - ), - ]; - let ready_conditions = vec![WaitFor::message_on_stdout( - "Checking need to trigger auto leader balancing", - )]; - commands.push(ExecCommand::new(cmd).with_container_ready_conditions(ready_conditions)); - Ok(commands) - } -} - -#[cfg(test)] -mod tests { - use std::time::Duration; - - use futures::StreamExt; - use rdkafka::{ - consumer::{Consumer, StreamConsumer}, - producer::{FutureProducer, FutureRecord}, - ClientConfig, Message, - }; - use testcontainers::runners::AsyncRunner; - - use crate::kafka; - - #[tokio::test] - async fn produce_and_consume_messages() -> Result<(), Box> { - let _ = pretty_env_logger::try_init(); - let kafka_node = kafka::Kafka::default().start().await?; - - let bootstrap_servers = format!( - "127.0.0.1:{}", - kafka_node.get_host_port_ipv4(kafka::KAFKA_PORT).await? - ); - - let producer = ClientConfig::new() - .set("bootstrap.servers", &bootstrap_servers) - .set("message.timeout.ms", "5000") - .create::() - .expect("Failed to create Kafka FutureProducer"); - - let consumer = ClientConfig::new() - .set("group.id", "testcontainer-rs") - .set("bootstrap.servers", &bootstrap_servers) - .set("session.timeout.ms", "6000") - .set("enable.auto.commit", "false") - .set("auto.offset.reset", "earliest") - .create::() - .expect("Failed to create Kafka StreamConsumer"); - - let topic = "test-topic"; - - let number_of_messages_to_produce = 5_usize; - let expected: Vec = (0..number_of_messages_to_produce) - .map(|i| format!("Message {i}")) - .collect(); - - for (i, message) in expected.iter().enumerate() { - producer - .send( - FutureRecord::to(topic) - .payload(message) - .key(&format!("Key {i}")), - Duration::from_secs(0), - ) - .await - .unwrap(); - } - - consumer - .subscribe(&[topic]) - .expect("Failed to subscribe to a topic"); - - let mut message_stream = consumer.stream(); - for produced in expected { - let borrowed_message = - tokio::time::timeout(Duration::from_secs(10), message_stream.next()) - .await - .unwrap() - .unwrap(); - - assert_eq!( - produced, - borrowed_message - .unwrap() - .payload_view::() - .unwrap() - .unwrap() - ); - } - - Ok(()) - } -} +pub use confluent::*;