Skip to content

Commit

Permalink
feat!:add mongodb replset option, upgrade mongodb version and add mon…
Browse files Browse the repository at this point in the history
…godb example
  • Loading branch information
sinyo-matu committed Jul 28, 2024
1 parent 7f8988c commit 58f5682
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 7 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ bollard = "0.16"
futures = "0.3"
lapin = "2.3.1"
meilisearch-sdk = "0.26.1"
mongodb = "2.6.1"
mongodb = "3.0.1"
mysql = "25.0.0"
neo4rs = "0.7.0"
oracle = "0.6.0"
Expand Down
85 changes: 85 additions & 0 deletions examples/mongo.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use mongodb::{bson::doc, Client};
use testcontainers::{runners::AsyncRunner, ContainerAsync};
use testcontainers_modules::mongo::Mongo;

pub async fn get_connection_string(
container: ContainerAsync<Mongo>,
) -> Result<String, testcontainers::core::error::TestcontainersError> {
Ok(format!(
"mongodb://{host}:{port}/",
host = container.get_host().await?,
port = container.get_host_port_ipv4(27017).await?,
))
}

#[derive(serde::Serialize, serde::Deserialize, Debug)]
pub struct Item {
name: String,
qty: u32,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + 'static>> {
let _ = pretty_env_logger::try_init();
// start a simple mongo server
println!("creating a mongodb node...");
let node = Mongo::default().start().await?;
let host = node.get_host().await?;
let port = node.get_host_port_ipv4(27017).await?;
let client = Client::with_uri_str(format!("mongodb://{host}:{port}/")).await?;
let col = client.database("test_db").collection::<Item>("test_col");

let item = Item {
name: "journal".to_string(),
qty: 25,
};
println!("inserting item: {:?}", item);
col.insert_one(item).await?;

println!("finding item...");
let found: Option<Item> = col
.find_one(doc! {
"name": "journal".to_string()
})
.await?;
assert!(found.is_some());
assert!(found.unwrap().qty == 25);
println!("done!");

println!("will try transactions...");
println!("creating a mongodb with replica set node...");
// start mongo server with replica set
let node = Mongo::repl_set().start().await?;
let host = node.get_host().await?;
let port = node.get_host_port_ipv4(27017).await?;
let client = Client::with_uri_str(format!(
"mongodb://{host}:{port}/?directConnection=true&serverSelectionTimeoutMS=2000"
))
.await?;

let col = client.database("test_db").collection::<Item>("test_col");

let item = Item {
name: "mat".to_string(),
qty: 85,
};
let mut session = client.start_session().await?;
println!("starting transaction...");
session.start_transaction().await?;
println!("inserting item: {:?}", item);
// we can use the transactions now
col.insert_one(item).session(&mut session).await?;
println!("committing...");
session.commit_transaction().await?;

println!("finding item...");
let found: Option<Item> = col
.find_one(doc! {
"name": "mat".to_string()
})
.await?;
assert!(found.is_some());
assert!(found.unwrap().qty == 85);
println!("done!");
Ok(())
}
108 changes: 102 additions & 6 deletions src/mongo/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,40 @@
use testcontainers::{core::WaitFor, Image};
use testcontainers::{
core::{CmdWaitFor, ExecCommand, WaitFor},
Image,
};

const NAME: &str = "mongo";
const TAG: &str = "5.0.6";

#[derive(Debug, Clone)]
enum InstanceKind {
Standalone,
ReplSet,
}

impl Default for InstanceKind {
fn default() -> Self {
Self::Standalone
}
}

#[derive(Default, Debug, Clone)]
pub struct Mongo;
pub struct Mongo {
kind: InstanceKind,
}

impl Mongo {
pub fn new() -> Self {
Self {
kind: InstanceKind::Standalone,
}
}
pub fn repl_set() -> Self {
Self {
kind: InstanceKind::ReplSet,
}
}
}

impl Image for Mongo {
fn name(&self) -> &str {
Expand All @@ -18,6 +48,35 @@ impl Image for Mongo {
fn ready_conditions(&self) -> Vec<WaitFor> {
vec![WaitFor::message_on_stdout("Waiting for connections")]
}

fn cmd(&self) -> impl IntoIterator<Item = impl Into<std::borrow::Cow<'_, str>>> {
match self.kind {
InstanceKind::Standalone => Vec::<String>::new(),
InstanceKind::ReplSet => vec!["--replSet".to_string(), "rs".to_string()],
}
}

fn exec_after_start(
&self,
_: testcontainers::core::ContainerState,
) -> Result<Vec<ExecCommand>, testcontainers::TestcontainersError> {
match self.kind {
InstanceKind::Standalone => Ok(Default::default()),
InstanceKind::ReplSet => Ok(vec![ExecCommand::new(vec![
"mongosh".to_string(),
"--quiet".to_string(),
"--eval".to_string(),
"'rs.initiate()'".to_string(),
])
.with_cmd_ready_condition(CmdWaitFor::message_on_stdout(
"Using a default configuration for the set",
))
// Wait for the replica set to be ready
// This is a workaround because the replica set isn't immediately ready
// Without this delay, an immediate connection to the replica set would fail
.with_container_ready_conditions(vec![WaitFor::seconds(2)])]),
}
}
}

#[cfg(test)]
Expand All @@ -30,16 +89,16 @@ mod tests {
#[tokio::test]
async fn mongo_fetch_document() -> Result<(), Box<dyn std::error::Error + 'static>> {
let _ = pretty_env_logger::try_init();
let node = mongo::Mongo.start().await?;
let node = mongo::Mongo::default().start().await?;
let host_ip = node.get_host().await?;
let host_port = node.get_host_port_ipv4(27017.tcp()).await?;
let url = format!("mongodb://{host_ip}:{host_port}/");

let client: Client = Client::with_uri_str(&url).await.unwrap();
let db = client.database("some_db");
let coll = db.collection("some-coll");
let coll = db.collection("some_coll");

let insert_one_result = coll.insert_one(bson::doc! { "x": 42 }, None).await.unwrap();
let insert_one_result = coll.insert_one(bson::doc! { "x": 42 }).await.unwrap();
assert!(!insert_one_result
.inserted_id
.as_object_id()
Expand All @@ -48,12 +107,49 @@ mod tests {
.is_empty());

let find_one_result: bson::Document = coll
.find_one(bson::doc! { "x": 42 }, None)
.find_one(bson::doc! { "x": 42 })
.await
.unwrap()
.unwrap();
assert_eq!(42, find_one_result.get_i32("x").unwrap());

Ok(())
}

#[tokio::test]
async fn mongo_repl_set_fetch_document() -> Result<(), Box<dyn std::error::Error + 'static>> {
let _ = pretty_env_logger::try_init();
let node = mongo::Mongo::repl_set().start().await?;
let host_ip = node.get_host().await?;
let host_port = node.get_host_port_ipv4(27017).await?;
let url = format!("mongodb://{host_ip}:{host_port}/?directConnection=true",);

let client: Client = Client::with_uri_str(url).await?;
let db = client.database("some_db");
let coll = db.collection("some-coll");

let mut session = client.start_session().await?;
session.start_transaction().await?;

let insert_one_result = coll
.insert_one(bson::doc! { "x": 42 })
.session(&mut session)
.await?;
assert!(!insert_one_result
.inserted_id
.as_object_id()
.unwrap()
.to_hex()
.is_empty());
session.commit_transaction().await?;

let find_one_result: bson::Document = coll
.find_one(bson::doc! { "x": 42 })
.await
.unwrap()
.unwrap();

assert_eq!(42, find_one_result.get_i32("x").unwrap());
Ok(())
}
}

0 comments on commit 58f5682

Please sign in to comment.