diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a125e761..12f8bd89 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -12,6 +12,7 @@ jobs: - test-tokio-postgres - test-mysql - test-mysql-async + - test-tiberius - doc steps: - run: exit 0 @@ -88,7 +89,7 @@ jobs: - uses: actions-rs/toolchain@v1 with: toolchain: ${{ matrix.rust }} - - run: cd refinery && cargo test --features tokio,tokio-postgres --test tokio_postgres -- --test-threads 1 + - run: cd refinery && cargo test --features tokio-postgres --test tokio_postgres -- --test-threads 1 test-mysql: name: Test mysql @@ -135,7 +136,29 @@ jobs: - uses: actions-rs/toolchain@v1 with: toolchain: ${{ matrix.rust }} - - run: cd refinery && cargo test --features tokio,mysql_async --test mysql_async -- --test-threads 1 + - run: cd refinery && cargo test --features mysql_async --test mysql_async -- --test-threads 1 + + test-tiberius: + name: Test tiberius + runs-on: ubuntu-latest + strategy: + matrix: + rust: [stable, nightly, 1.52.0] + services: + mssql: + image: mcr.microsoft.com/mssql/server:2017-latest + ports: + - 1433:1433 + env: + ACCEPT_EULA: yes + SA_PASSWORD: Passw0rd + steps: + - uses: actions/checkout@v2 + - uses: actions-rs/toolchain@v1 + with: + toolchain: ${{ matrix.rust }} + - run: cargo install --path ./refinery_cli --no-default-features --features=mssql + - run: cd refinery && cargo test --features tiberius --test tiberius -- --test-threads 1 doc: name: Build docs diff --git a/README.md b/README.md index b3f58efc..ef72f396 100644 --- a/README.md +++ b/README.md @@ -50,10 +50,10 @@ For more examples, refer to the [`examples`](examples). ### Unversioned VS Versioned migrations Depending on how your project / team has been structured will define whether you want to use Versioned migrations `V{1}__{2}.[sql|rs]` or Unversioned migrations `U{1}__{2}.[sql|rs]`. -If all migrations are created synchronously and are deployed synchronously you won't run into any problems using Versioned migrations. +If all migrations are created synchronously and are deployed synchronously you won't run into any problems using Versioned migrations. This is because you can be sure the next migration being run is _always_ going to have a version number greater than the previous. -With Unversioned migrations there is more flexibility in the order that the migrations can be created and deployed. +With Unversioned migrations there is more flexibility in the order that the migrations can be created and deployed. If developer 1 creates a PR with a migration today `U11__update_cars_table.sql`, but it is reviewed for a week. Meanwhile developer 2 creates a PR with migration `U12__create_model_tags.sql` that is much simpler and gets merged and deployed immediately. This would stop developer 1's migration from ever running if you were using Versioned migrations because the next migration would need to be > 12. @@ -67,15 +67,15 @@ By default, refinery runs each migration in a single transaction. Alternatively, refinery's design is based on [flyway](https://flywaydb.org/) and so, shares its [perspective](https://flywaydb.org/documentation/command/undo#important-notes) on undo/rollback migrations. To undo/rollback a migration, you have to generate a new one and write specifically what you want to undo. -## Compatibility +## MSRV refinery aims to support stable Rust, the previous Rust version, and nightly. ## Async -Starting with version 0.2 refinery supports [tokio-postgres](https://crates.io/crates/tokio-postgres) and [`mysql_async`](https://crates.io/crates/mysql_async). To migrate async you have to call `Runner`'s [run_async](https://docs.rs/refinery/latest/refinery/struct.Runner.html). -There are plans to support [Tiberius](https://github.com/steffengy/tiberius) when futures 0.3 support stabilizes. -For Rusqlite, the best way to run migrations in an async context is to run them inside tokio's [`spawn_blocking`](https://docs.rs/tokio/0.2.21/tokio/task/fn.spawn_blocking.html) for example. +Starting with version 0.2 refinery supports [tokio-postgres](https://crates.io/crates/tokio-postgres), [`mysql_async`](https://crates.io/crates/mysql_async) +and [Tiberius](https://github.com/prisma/tiberius) +For Rusqlite, the best way to run migrations in an async context is to run them inside tokio's [`spawn_blocking`](https://docs.rs/tokio/1.10.0/tokio/task/fn.spawn_blocking.html) for example. ## Contributing diff --git a/refinery/Cargo.toml b/refinery/Cargo.toml index d4a92222..5ba28d60 100644 --- a/refinery/Cargo.toml +++ b/refinery/Cargo.toml @@ -19,16 +19,19 @@ postgres = ["refinery-core/postgres"] mysql = ["refinery-core/mysql"] tokio-postgres = ["refinery-core/tokio-postgres"] mysql_async = ["refinery-core/mysql_async"] -tokio = ["refinery-core/tokio"] +tiberius = ["refinery-core/tiberius"] +tiberius-config = ["refinery-core/tiberius", "refinery-core/tiberius-config"] [dependencies] refinery-core= { version = "0.6.0", path = "../refinery_core" } refinery-macros= { version = "0.6.0", path = "../refinery_macros" } [dev-dependencies] -barrel = { version = "0.6", features = ["sqlite3", "pg", "mysql"] } +barrel = { git = "https://git.irde.st/spacekookie/barrel.git", features = ["sqlite3", "pg", "mysql", "mssql"] } futures = "0.3" assert_cmd = "1.0" predicates = "1" tempfile = "3" chrono = "0.4" +tokio-util = { version = "0.6.7", features = ["compat"] } +tokio = { version = "1.9.0", features = ["full"] } diff --git a/refinery/tests/tiberius.rs b/refinery/tests/tiberius.rs new file mode 100644 index 00000000..ca782e1f --- /dev/null +++ b/refinery/tests/tiberius.rs @@ -0,0 +1,740 @@ +use barrel::backend::MsSql as Sql; +mod mod_migrations; + +#[cfg(all(feature = "tiberius-config"))] +mod tiberius { + use super::mod_migrations; + use assert_cmd::prelude::*; + use chrono::Local; + use futures::FutureExt; + use predicates::str::contains; + use refinery::{ + config::{Config, ConfigDbType}, + AsyncMigrate, Migration, Runner, + }; + use refinery_core::tiberius::{self, Config as TConfig}; + use std::convert::TryInto; + use std::panic::AssertUnwindSafe; + use std::process::Command; + use tokio_util::compat::TokioAsyncWriteCompatExt; + + fn get_migrations() -> Vec { + let migration1 = Migration::unapplied( + "V1__initial.sql", + include_str!("./sql_migrations/V1-2/V1__initial.sql"), + ) + .unwrap(); + + let migration2 = Migration::unapplied( + "V2__add_cars_and_motos_table.sql", + include_str!("./sql_migrations/V1-2/V2__add_cars_and_motos_table.sql"), + ) + .unwrap(); + + let migration3 = Migration::unapplied( + "V3__add_brand_to_cars_table", + include_str!("./sql_migrations/V3/V3__add_brand_to_cars_table.sql"), + ) + .unwrap(); + + let migration4 = Migration::unapplied( + "V4__add_year_to_motos_table.sql", + include_str!("./sql_migrations/V4__add_year_to_motos_table.sql"), + ) + .unwrap(); + + let migration5 = Migration::unapplied( + "V5__add_year_field_to_cars", + &"ALTER TABLE cars ADD year INTEGER;", + ) + .unwrap(); + + vec![migration1, migration2, migration3, migration4, migration5] + } + + mod embedded { + use refinery::embed_migrations; + embed_migrations!("./tests/sql_migrations"); + } + + mod broken { + use refinery::embed_migrations; + embed_migrations!("./tests/sql_migrations_broken"); + } + + mod missing { + use refinery::embed_migrations; + embed_migrations!("./tests/sql_migrations_missing"); + } + + async fn run_test>(t: T) { + let config = generate_config("tempdb"); + let tcp = tokio::net::TcpStream::connect(format!( + "{}:{}", + config.db_host().unwrap(), + config.db_port().unwrap() + )) + .await + .unwrap(); + let mut tconfig: TConfig = (&config).try_into().unwrap(); + tconfig.trust_cert(); + let mut client = tiberius::Client::connect(tconfig, tcp.compat_write()) + .await + .unwrap(); + + client + .simple_query("CREATE DATABASE refinery_test") + .await + .unwrap(); + + let result = AssertUnwindSafe(t).catch_unwind().await; + client + .simple_query("DROP DATABASE refinery_test") + .await + .unwrap(); + + assert!(result.is_ok()); + } + + fn generate_config(database: &str) -> Config { + let config = Config::new(ConfigDbType::Mssql) + .set_db_name(database) + .set_db_user("SA") + .set_db_host("localhost") + .set_db_pass("Passw0rd") + .set_db_port("1433"); + + config + } + + #[tokio::test] + async fn embedded_creates_migration_table() { + run_test(async { + let config = generate_config("refinery_test"); + + let tcp = tokio::net::TcpStream::connect(format!("{}:{}", config.db_host().unwrap(), config.db_port().unwrap())) + .await + .unwrap(); + let mut tconfig: TConfig = (&config).try_into().unwrap(); + tconfig.trust_cert(); + let mut client = tiberius::Client::connect(tconfig, tcp.compat_write()).await.unwrap(); + + embedded::migrations::runner() + .run_async(&mut client) + .await + .unwrap(); + + let row = client + .simple_query("SELECT table_name FROM information_schema.tables WHERE table_name='refinery_schema_history'") + .await + .unwrap() + .into_row() + .await + .unwrap() + .unwrap(); + + let name: &str = row.get(0).unwrap(); + assert_eq!("refinery_schema_history", name); + + }).await; + } + + #[tokio::test] + async fn embedded_creates_migration_table_grouped_transaction() { + run_test(async { + let config = generate_config("refinery_test"); + + let tcp = tokio::net::TcpStream::connect(format!("{}:{}", config.db_host().unwrap(), config.db_port().unwrap())) + .await + .unwrap(); + let mut tconfig: TConfig = (&config).try_into().unwrap(); + tconfig.trust_cert(); + let mut client = tiberius::Client::connect(tconfig, tcp.compat_write()).await.unwrap(); + + embedded::migrations::runner() + .set_grouped(true) + .run_async(&mut client) + .await + .unwrap(); + + let row = client + .simple_query("SELECT table_name FROM information_schema.tables WHERE table_name='refinery_schema_history'") + .await + .unwrap() + .into_row() + .await + .unwrap() + .unwrap(); + + let name: &str = row.get(0).unwrap(); + assert_eq!("refinery_schema_history", name); + + }).await; + } + + #[tokio::test] + async fn report_contains_applied_migrations() { + run_test(async { + let config = generate_config("refinery_test"); + + let tcp = tokio::net::TcpStream::connect(format!( + "{}:{}", + config.db_host().unwrap(), + config.db_port().unwrap() + )) + .await + .unwrap(); + let mut tconfig: TConfig = (&config).try_into().unwrap(); + tconfig.trust_cert(); + let mut client = tiberius::Client::connect(tconfig, tcp.compat_write()) + .await + .unwrap(); + + let report = embedded::migrations::runner() + .run_async(&mut client) + .await + .unwrap(); + + let migrations = get_migrations(); + let applied_migrations = report.applied_migrations(); + + assert_eq!(4, applied_migrations.len()); + + assert_eq!(migrations[0].version(), applied_migrations[0].version()); + assert_eq!(migrations[1].version(), applied_migrations[1].version()); + assert_eq!(migrations[2].version(), applied_migrations[2].version()); + assert_eq!(migrations[3].version(), applied_migrations[3].version()); + + assert_eq!(migrations[0].name(), migrations[0].name()); + assert_eq!(migrations[1].name(), applied_migrations[1].name()); + assert_eq!(migrations[2].name(), applied_migrations[2].name()); + assert_eq!(migrations[3].name(), applied_migrations[3].name()); + + assert_eq!(migrations[0].checksum(), applied_migrations[0].checksum()); + assert_eq!(migrations[1].checksum(), applied_migrations[1].checksum()); + assert_eq!(migrations[2].checksum(), applied_migrations[2].checksum()); + assert_eq!(migrations[3].checksum(), applied_migrations[3].checksum()); + }) + .await; + } + + #[tokio::test] + async fn embedded_applies_migration() { + run_test(async { + let config = generate_config("refinery_test"); + + let tcp = tokio::net::TcpStream::connect(format!( + "{}:{}", + config.db_host().unwrap(), + config.db_port().unwrap() + )) + .await + .unwrap(); + let mut tconfig: TConfig = (&config).try_into().unwrap(); + tconfig.trust_cert(); + let mut client = tiberius::Client::connect(tconfig, tcp.compat_write()) + .await + .unwrap(); + + embedded::migrations::runner() + .run_async(&mut client) + .await + .unwrap(); + + client + .simple_query("INSERT INTO persons (name, city) VALUES ('John Legend', 'New York')") + .await + .unwrap(); + + let row = client + .simple_query("SELECT name, city FROM persons") + .await + .unwrap() + .into_row() + .await + .unwrap() + .unwrap(); + + let name: &str = row.get(0).unwrap(); + let city: &str = row.get(1).unwrap(); + + assert_eq!("John Legend", name); + assert_eq!("New York", city); + }) + .await + } + + #[tokio::test] + async fn embedded_applies_migration_grouped_transaction() { + run_test(async { + let config = generate_config("refinery_test"); + + let tcp = tokio::net::TcpStream::connect(format!( + "{}:{}", + config.db_host().unwrap(), + config.db_port().unwrap() + )) + .await + .unwrap(); + let mut tconfig: TConfig = (&config).try_into().unwrap(); + tconfig.trust_cert(); + let mut client = tiberius::Client::connect(tconfig, tcp.compat_write()) + .await + .unwrap(); + + embedded::migrations::runner() + .set_grouped(true) + .run_async(&mut client) + .await + .unwrap(); + + client + .simple_query("INSERT INTO persons (name, city) VALUES ('John Legend', 'New York')") + .await + .unwrap(); + + let row = client + .simple_query("SELECT name, city FROM persons") + .await + .unwrap() + .into_row() + .await + .unwrap() + .unwrap(); + + let name: &str = row.get(0).unwrap(); + let city: &str = row.get(1).unwrap(); + + assert_eq!("John Legend", name); + assert_eq!("New York", city); + }) + .await + } + + #[tokio::test] + async fn embedded_updates_schema_history() { + run_test(async { + let config = generate_config("refinery_test"); + + let tcp = tokio::net::TcpStream::connect(format!( + "{}:{}", + config.db_host().unwrap(), + config.db_port().unwrap() + )) + .await + .unwrap(); + let mut tconfig: TConfig = (&config).try_into().unwrap(); + tconfig.trust_cert(); + let mut client = tiberius::Client::connect(tconfig, tcp.compat_write()) + .await + .unwrap(); + + embedded::migrations::runner() + .run_async(&mut client) + .await + .unwrap(); + + let current = client.get_last_applied_migration().await.unwrap().unwrap(); + assert_eq!(4, current.version()); + assert_eq!(Local::today(), current.applied_on().unwrap().date()); + }) + .await + } + + #[tokio::test] + async fn embedded_updates_schema_history_grouped_transaction() { + run_test(async { + let config = generate_config("refinery_test"); + + let tcp = tokio::net::TcpStream::connect(format!( + "{}:{}", + config.db_host().unwrap(), + config.db_port().unwrap() + )) + .await + .unwrap(); + let mut tconfig: TConfig = (&config).try_into().unwrap(); + tconfig.trust_cert(); + let mut client = tiberius::Client::connect(tconfig, tcp.compat_write()) + .await + .unwrap(); + + embedded::migrations::runner() + .set_grouped(true) + .run_async(&mut client) + .await + .unwrap(); + + let current = client.get_last_applied_migration().await.unwrap().unwrap(); + assert_eq!(4, current.version()); + assert_eq!(Local::today(), current.applied_on().unwrap().date()); + }) + .await + } + + #[tokio::test] + async fn embedded_updates_to_last_working_if_not_grouped_transaction() { + run_test(async { + let config = generate_config("refinery_test"); + + let tcp = tokio::net::TcpStream::connect(format!( + "{}:{}", + config.db_host().unwrap(), + config.db_port().unwrap() + )) + .await + .unwrap(); + let mut tconfig: TConfig = (&config).try_into().unwrap(); + tconfig.trust_cert(); + let mut client = tiberius::Client::connect(tconfig, tcp.compat_write()) + .await + .unwrap(); + + let result = broken::migrations::runner().run_async(&mut client).await; + + let current = client.get_last_applied_migration().await.unwrap().unwrap(); + + let err = result.unwrap_err(); + let migrations = get_migrations(); + let applied_migrations = err.report().unwrap().applied_migrations(); + + assert_eq!(Local::today(), current.applied_on().unwrap().date()); + assert_eq!(2, current.version()); + assert_eq!(2, applied_migrations.len()); + + assert_eq!(1, applied_migrations[0].version()); + assert_eq!(2, applied_migrations[1].version()); + + assert_eq!("initial", migrations[0].name()); + assert_eq!("add_cars_table", applied_migrations[1].name()); + + assert_eq!(2959965718684201605, applied_migrations[0].checksum()); + assert_eq!(8238603820526370208, applied_migrations[1].checksum()); + }) + .await + } + + #[tokio::test] + async fn embedded_doesnt_update_to_last_working_if_grouped() { + run_test(async { + let config = generate_config("refinery_test"); + + let tcp = tokio::net::TcpStream::connect(format!( + "{}:{}", + config.db_host().unwrap(), + config.db_port().unwrap() + )) + .await + .unwrap(); + let mut tconfig: TConfig = (&config).try_into().unwrap(); + tconfig.trust_cert(); + let mut client = tiberius::Client::connect(tconfig, tcp.compat_write()) + .await + .unwrap(); + + let result = broken::migrations::runner() + .set_grouped(true) + .run_async(&mut client) + .await; + + let current = client.get_last_applied_migration().await.unwrap(); + + dbg!(¤t); + assert!(current.is_none()); + // matches!(current, None); + + result.unwrap_err(); + + let row = client + .simple_query("SELECT version FROM refinery_schema_history") + .await + .unwrap() + .into_row() + .await + .unwrap(); + + assert!(row.is_none()); + }) + .await + } + + #[tokio::test] + async fn mod_creates_migration_table() { + run_test(async { + let config = generate_config("refinery_test"); + + let tcp = tokio::net::TcpStream::connect(format!("{}:{}", config.db_host().unwrap(), config.db_port().unwrap())) + .await + .unwrap(); + let mut tconfig: TConfig = (&config).try_into().unwrap(); + tconfig.trust_cert(); + let mut client = tiberius::Client::connect(tconfig, tcp.compat_write()).await.unwrap(); + + mod_migrations::runner() + .run_async(&mut client) + .await + .unwrap(); + + let row = client + .simple_query("SELECT table_name FROM information_schema.tables WHERE table_name='refinery_schema_history'") + .await + .unwrap() + .into_row() + .await + .unwrap() + .unwrap(); + + let name: &str = row.get(0).unwrap(); + assert_eq!("refinery_schema_history", name); + + }).await; + } + + #[tokio::test] + async fn mod_applies_migration() { + run_test(async { + let config = generate_config("refinery_test"); + + let tcp = tokio::net::TcpStream::connect(format!( + "{}:{}", + config.db_host().unwrap(), + config.db_port().unwrap() + )) + .await + .unwrap(); + let mut tconfig: TConfig = (&config).try_into().unwrap(); + tconfig.trust_cert(); + let mut client = tiberius::Client::connect(tconfig, tcp.compat_write()) + .await + .unwrap(); + + mod_migrations::runner() + .run_async(&mut client) + .await + .unwrap(); + + client + .simple_query("INSERT INTO persons (name, city) VALUES ('John Legend', 'New York')") + .await + .unwrap(); + + let row = client + .simple_query("SELECT name, city FROM persons") + .await + .unwrap() + .into_row() + .await + .unwrap() + .unwrap(); + + let name: &str = row.get(0).unwrap(); + let city: &str = row.get(1).unwrap(); + + assert_eq!("John Legend", name); + assert_eq!("New York", city); + }) + .await + } + + #[tokio::test] + async fn mod_updates_schema_history() { + run_test(async { + let config = generate_config("refinery_test"); + + let tcp = tokio::net::TcpStream::connect(format!( + "{}:{}", + config.db_host().unwrap(), + config.db_port().unwrap() + )) + .await + .unwrap(); + let mut tconfig: TConfig = (&config).try_into().unwrap(); + tconfig.trust_cert(); + let mut client = tiberius::Client::connect(tconfig, tcp.compat_write()) + .await + .unwrap(); + + mod_migrations::runner() + .run_async(&mut client) + .await + .unwrap(); + + let current = client.get_last_applied_migration().await.unwrap().unwrap(); + assert_eq!(4, current.version()); + assert_eq!(Local::today(), current.applied_on().unwrap().date()); + }) + .await + } + + #[tokio::test] + async fn gets_applied_migrations() { + run_test(async { + let config = generate_config("refinery_test"); + + let tcp = tokio::net::TcpStream::connect(format!( + "{}:{}", + config.db_host().unwrap(), + config.db_port().unwrap() + )) + .await + .unwrap(); + let mut tconfig: TConfig = (&config).try_into().unwrap(); + tconfig.trust_cert(); + let mut client = tiberius::Client::connect(tconfig, tcp.compat_write()) + .await + .unwrap(); + + embedded::migrations::runner() + .run_async(&mut client) + .await + .unwrap(); + + let applied_migrations = client.get_applied_migrations().await.unwrap(); + let migrations = get_migrations(); + assert_eq!(4, applied_migrations.len()); + + assert_eq!(migrations[0].version(), applied_migrations[0].version()); + assert_eq!(migrations[1].version(), applied_migrations[1].version()); + assert_eq!(migrations[2].version(), applied_migrations[2].version()); + assert_eq!(migrations[3].version(), applied_migrations[3].version()); + + assert_eq!(migrations[0].name(), migrations[0].name()); + assert_eq!(migrations[1].name(), applied_migrations[1].name()); + assert_eq!(migrations[2].name(), applied_migrations[2].name()); + assert_eq!(migrations[3].name(), applied_migrations[3].name()); + + assert_eq!(migrations[0].checksum(), applied_migrations[0].checksum()); + assert_eq!(migrations[1].checksum(), applied_migrations[1].checksum()); + assert_eq!(migrations[2].checksum(), applied_migrations[2].checksum()); + assert_eq!(migrations[3].checksum(), applied_migrations[3].checksum()); + }) + .await; + } + + #[tokio::test] + async fn migrates_from_config() { + run_test(async { + let mut config = generate_config("refinery_test"); + config.set_trust_cert(); + + let migrations = get_migrations(); + let runner = Runner::new(&migrations) + .set_grouped(false) + .set_abort_divergent(true) + .set_abort_missing(true); + + runner.run_async(&mut config).await.unwrap(); + + let applied_migrations = runner + .get_applied_migrations_async(&mut config) + .await + .unwrap(); + assert_eq!(5, applied_migrations.len()); + + assert_eq!(migrations[0].version(), applied_migrations[0].version()); + assert_eq!(migrations[1].version(), applied_migrations[1].version()); + assert_eq!(migrations[2].version(), applied_migrations[2].version()); + assert_eq!(migrations[3].version(), applied_migrations[3].version()); + assert_eq!(migrations[4].version(), applied_migrations[4].version()); + + assert_eq!(migrations[0].name(), migrations[0].name()); + assert_eq!(migrations[1].name(), applied_migrations[1].name()); + assert_eq!(migrations[2].name(), applied_migrations[2].name()); + assert_eq!(migrations[3].name(), applied_migrations[3].name()); + assert_eq!(migrations[4].name(), applied_migrations[4].name()); + + assert_eq!(migrations[0].checksum(), applied_migrations[0].checksum()); + assert_eq!(migrations[1].checksum(), applied_migrations[1].checksum()); + assert_eq!(migrations[2].checksum(), applied_migrations[2].checksum()); + assert_eq!(migrations[3].checksum(), applied_migrations[3].checksum()); + assert_eq!(migrations[4].checksum(), applied_migrations[4].checksum()); + }) + .await; + } + + #[tokio::test] + async fn migrate_from_config_report_contains_migrations() { + run_test(async { + let mut config = generate_config("refinery_test"); + config.set_trust_cert(); + + let migrations = get_migrations(); + let runner = Runner::new(&migrations) + .set_grouped(false) + .set_abort_divergent(true) + .set_abort_missing(true); + + let report = runner.run_async(&mut config).await.unwrap(); + + let applied_migrations = report.applied_migrations(); + assert_eq!(5, applied_migrations.len()); + + assert_eq!(migrations[0].version(), applied_migrations[0].version()); + assert_eq!(migrations[1].version(), applied_migrations[1].version()); + assert_eq!(migrations[2].version(), applied_migrations[2].version()); + assert_eq!(migrations[3].version(), applied_migrations[3].version()); + assert_eq!(migrations[4].version(), applied_migrations[4].version()); + + assert_eq!(migrations[0].name(), migrations[0].name()); + assert_eq!(migrations[1].name(), applied_migrations[1].name()); + assert_eq!(migrations[2].name(), applied_migrations[2].name()); + assert_eq!(migrations[3].name(), applied_migrations[3].name()); + assert_eq!(migrations[4].name(), applied_migrations[4].name()); + + assert_eq!(migrations[0].checksum(), applied_migrations[0].checksum()); + assert_eq!(migrations[1].checksum(), applied_migrations[1].checksum()); + assert_eq!(migrations[2].checksum(), applied_migrations[2].checksum()); + assert_eq!(migrations[3].checksum(), applied_migrations[3].checksum()); + assert_eq!(migrations[4].checksum(), applied_migrations[4].checksum()); + }) + .await; + } + + #[tokio::test] + async fn migrate_from_config_report_returns_last_applied_migration() { + run_test(async { + let mut config = generate_config("refinery_test"); + config.set_trust_cert(); + + let migrations = get_migrations(); + let runner = Runner::new(&migrations) + .set_grouped(false) + .set_abort_divergent(true) + .set_abort_missing(true); + + runner.run_async(&mut config).await.unwrap(); + + let applied_migration = runner + .get_last_applied_migration_async(&mut config) + .await + .unwrap() + .unwrap(); + assert_eq!(5, applied_migration.version()); + + assert_eq!(migrations[4].version(), applied_migration.version()); + assert_eq!(migrations[4].name(), applied_migration.name()); + assert_eq!(migrations[4].checksum(), applied_migration.checksum()); + }) + .await; + } + + // this is a blocking test, but shouldn't do arm running it inside tokio's runtime + #[tokio::test] + async fn migrates_from_cli() { + run_test(async { + Command::new("refinery") + .args(&[ + "migrate", + "-c", + "tests/tiberius_refinery.toml", + "files", + "-p", + "tests/sql_migrations", + ]) + .unwrap() + .assert() + .stdout(contains("applying migration: V4__add_year_to_motos_table")); + }) + .await; + } +} diff --git a/refinery/tests/tiberius_refinery.toml b/refinery/tests/tiberius_refinery.toml new file mode 100644 index 00000000..700402dc --- /dev/null +++ b/refinery/tests/tiberius_refinery.toml @@ -0,0 +1,8 @@ +[main] +envix = "Develop" +db_type = "Mssql" +db_host = "localhost" +db_port = "1433" +db_user = "SA" +db_pass = "Passw0rd" +trust_cert = true diff --git a/refinery_cli/Cargo.toml b/refinery_cli/Cargo.toml index a9f676d8..6cd8a3d6 100644 --- a/refinery_cli/Cargo.toml +++ b/refinery_cli/Cargo.toml @@ -19,7 +19,8 @@ default = ["mysql", "postgresql", "sqlite-bundled"] postgresql = ["refinery-core/postgres"] mysql = ["refinery-core/mysql"] sqlite = ["refinery-core/rusqlite"] -sqlite-bundled = ["refinery-core/rusqlite-bundled"] +sqlite-bundled = ["sqlite", "refinery-core/rusqlite-bundled"] +mssql = ["refinery-core/tiberius-config", "tokio"] [dependencies] refinery-core = { version = "0.6", path = "../refinery_core", default-features = false } @@ -31,6 +32,8 @@ log = "0.4" anyhow = "1" regex = "1" walkdir = "2.3.1" +cfg-if = "1.0.0" +tokio = { version = "1.0", features = ["full"], optional = true } [dev-dependencies] predicates = "1" diff --git a/refinery_cli/src/migrate.rs b/refinery_cli/src/migrate.rs index c3ecc194..fb6f953f 100644 --- a/refinery_cli/src/migrate.rs +++ b/refinery_cli/src/migrate.rs @@ -54,11 +54,38 @@ fn run_files_migrations( migrations.push(migration); } let mut config = config(config_location, env_var_opt)?; - Runner::new(&migrations) - .set_grouped(grouped) - .set_abort_divergent(divergent) - .set_abort_missing(missing) - .run(&mut config)?; + + cfg_if::cfg_if! { + if #[cfg(any(feature = "mysql", feature = "postgresql", feature = "sqlite"))] { + Runner::new(&migrations) + .set_grouped(grouped) + .set_abort_divergent(divergent) + .set_abort_missing(missing) + .run(&mut config)?; + } + } + + cfg_if::cfg_if! { + // tiberius is an async driver so we spawn tokio runtime and run the migrations + if #[cfg(feature = "mssql")] { + use tokio::runtime::Builder; + + let runtime = Builder::new_current_thread() + .enable_all() + .build() + .context("Can't start tokio runtime")?; + + runtime.block_on(async { + Runner::new(&migrations) + .set_grouped(grouped) + .set_abort_divergent(divergent) + .set_abort_missing(missing) + .run_async(&mut config) + .await + })?; + } + } + Ok(()) } diff --git a/refinery_cli/src/setup.rs b/refinery_cli/src/setup.rs index 3801ef23..1cbdfcd1 100644 --- a/refinery_cli/src/setup.rs +++ b/refinery_cli/src/setup.rs @@ -30,7 +30,7 @@ pub fn handle_setup(_: &ArgMatches) -> Result<()> { } fn get_config_from_input() -> Result { - println!("Select database 1) Mysql 2) Postgresql 3) Sqlite: "); + println!("Select database 1) Mysql 2) Postgresql 3) Sqlite 4) Mssql: "); print!("Enter a number: "); io::stdout().flush()?; @@ -40,19 +40,26 @@ fn get_config_from_input() -> Result { "1" => ConfigDbType::Mysql, "2" => ConfigDbType::Postgres, "3" => ConfigDbType::Sqlite, + "4" => ConfigDbType::Mssql, _ => return Err(anyhow!("invalid option")), }; let mut config = Config::new(db_type); if config.db_type() == ConfigDbType::Sqlite { - print!("Enter database path: "); - io::stdout().flush()?; - let mut db_path = String::new(); - io::stdin().read_line(&mut db_path)?; - //remove \n - db_path.pop(); - config = config.set_db_path(db_path.trim()); - return Ok(config); + cfg_if::cfg_if! { + if #[cfg(feature = "sqlite")] { + print!("Enter database path: "); + io::stdout().flush()?; + let mut db_path = String::new(); + io::stdin().read_line(&mut db_path)?; + //remove \n + db_path.pop(); + config = config.set_db_path(db_path.trim()); + return Ok(config); + } else { + panic!("tried to migrate async from config for a sqlite database, but sqlite feature was not enabled!"); + } + } } print!("Enter database host: "); diff --git a/refinery_core/Cargo.toml b/refinery_core/Cargo.toml index 9a3edce7..6b0215ce 100644 --- a/refinery_core/Cargo.toml +++ b/refinery_core/Cargo.toml @@ -11,6 +11,9 @@ edition = "2018" [features] default = [] rusqlite-bundled = ["rusqlite", "rusqlite/bundled"] +tiberius = ["tiberius-driver", "futures"] +tiberius-config = ["tiberius", "tokio", "tokio-util"] +tokio-postgres = ["tokio-postgres-driver", "tokio"] [dependencies] async-trait = "0.1" @@ -28,13 +31,16 @@ walkdir = "2.3.1" rusqlite = {version = ">= 0.23, < 0.26", optional = true} postgres = {version = "0.19", optional = true} -tokio-postgres = { version = "0.7", optional = true } +tokio-postgres-driver = { package = "tokio-postgres", version = "0.7", optional = true } mysql = { version = "21.0.0", optional = true } mysql_async = { version = "0.28.0", optional = true } - tokio = { version = "1.0", features = ["full"], optional = true } +tiberius-driver = { package = "tiberius", version = "0.6", optional = true } +futures = { version = "0.3.16", optional = true } +tokio-util = { version = "0.6.7", features = ["compat"], optional = true } [dev-dependencies] +barrel = { git = "https://git.irde.st/spacekookie/barrel.git", features = ["sqlite3", "pg", "mysql", "mssql"] } tempfile = "3.1.0" [package.metadata.docs.rs] diff --git a/refinery_core/src/config.rs b/refinery_core/src/config.rs index 84660403..e93674cd 100644 --- a/refinery_core/src/config.rs +++ b/refinery_core/src/config.rs @@ -19,6 +19,7 @@ pub enum ConfigDbType { Mysql, Postgres, Sqlite, + Mssql, } impl Config { @@ -33,6 +34,8 @@ impl Config { db_user: None, db_pass: None, db_name: None, + #[cfg(feature = "tiberius-config")] + trust_cert: false, }, } } @@ -98,15 +101,43 @@ impl Config { Ok(config) } - #[cfg(feature = "rusqlite")] - pub(crate) fn db_path(&self) -> Option<&Path> { - self.main.db_path.as_deref() + cfg_if::cfg_if! { + if #[cfg(feature = "rusqlite")] { + pub(crate) fn db_path(&self) -> Option<&Path> { + self.main.db_path.as_deref() + } + + pub fn set_db_path(self, db_path: &str) -> Config { + Config { + main: Main { + db_path: Some(db_path.into()), + ..self.main + }, + } + } + } + } + + cfg_if::cfg_if! { + if #[cfg(feature = "tiberius-config")] { + pub fn set_trust_cert(&mut self) { + self.main.trust_cert = true; + } + } } pub fn db_type(&self) -> ConfigDbType { self.main.db_type } + pub fn db_host(&self) -> Option<&str> { + self.main.db_host.as_deref() + } + + pub fn db_port(&self) -> Option<&str> { + self.main.db_port.as_deref() + } + pub fn set_db_user(self, db_user: &str) -> Config { Config { main: Main { @@ -125,15 +156,6 @@ impl Config { } } - pub fn set_db_path(self, db_path: &str) -> Config { - Config { - main: Main { - db_path: Some(db_path.into()), - ..self.main - }, - } - } - pub fn set_db_host(self, db_host: &str) -> Config { Config { main: Main { @@ -171,6 +193,7 @@ impl TryFrom for Config { "postgres" => ConfigDbType::Postgres, "postgresql" => ConfigDbType::Postgres, "sqlite" => ConfigDbType::Sqlite, + "mssql" => ConfigDbType::Mssql, _ => { return Err(Error::new( Kind::ConfigError("Unsupported database".into()), @@ -193,6 +216,8 @@ impl TryFrom for Config { db_user: Some(url.username().to_string()), db_pass: url.password().map(|r| r.to_string()), db_name: Some(url.path().trim_start_matches('/').to_string()), + #[cfg(feature = "tiberius-config")] + trust_cert: false, }, }) } @@ -222,6 +247,9 @@ struct Main { db_user: Option, db_pass: Option, db_name: Option, + #[cfg(feature = "tiberius-config")] + #[serde(default)] + trust_cert: bool, } #[cfg(any( @@ -255,6 +283,45 @@ pub(crate) fn build_db_url(name: &str, config: &Config) -> String { url } +cfg_if::cfg_if! { + if #[cfg(feature = "tiberius-config")] { + use tiberius_driver::{AuthMethod, Config as TConfig}; + + impl TryFrom<&Config> for TConfig { + type Error=Error; + + fn try_from(config: &Config) -> Result { + let mut tconfig = TConfig::new(); + if let Some(host) = &config.main.db_host { + tconfig.host(host); + } + + if let Some(port) = &config.main.db_port { + let port = port.parse().map_err(|_| Error::new( + Kind::ConfigError(format!("Couldn't parse value {} as mssql port", port)), + None, + ))?; + tconfig.port(port); + } + + if let Some(db) = &config.main.db_name { + tconfig.database(db); + } + + let user = config.main.db_user.as_deref().unwrap_or(""); + let pass = config.main.db_pass.as_deref().unwrap_or(""); + + if config.main.trust_cert == true { + tconfig.trust_cert(); + } + tconfig.authentication(AuthMethod::sql_server(&user, &pass)); + + Ok(tconfig) + } + } + } +} + #[cfg(test)] mod tests { use super::{build_db_url, Config, Kind}; diff --git a/refinery_core/src/drivers/config.rs b/refinery_core/src/drivers/config.rs index e0c5865e..4cba2d15 100644 --- a/refinery_core/src/drivers/config.rs +++ b/refinery_core/src/drivers/config.rs @@ -87,11 +87,18 @@ macro_rules! with_connection { } } } + ConfigDbType::Mssql => { + panic!("tried to synchronously migrate from config for a mssql database, but tiberius is an async driver"); + } }; } } -#[cfg(any(feature = "tokio-postgres", feature = "mysql_async"))] +#[cfg(any( + feature = "tokio-postgres", + feature = "mysql_async", + feature = "tiberius-config" +))] macro_rules! with_connection_async { ($config: ident, $op: expr) => { match $config.db_type() { @@ -111,9 +118,9 @@ macro_rules! with_connection_async { } ConfigDbType::Postgres => { cfg_if::cfg_if! { - if #[cfg(all(feature = "tokio-postgres", feature = "tokio"))] { + if #[cfg(feature = "tokio-postgres")] { let path = build_db_url("postgresql", $config); - let (client, connection ) = tokio_postgres::connect(path.as_str(), tokio_postgres::NoTls).await.migration_err("could not connect to database", None)?; + let (client, connection ) = tokio_postgres_driver::connect(path.as_str(), tokio_postgres_driver::NoTls).await.migration_err("could not connect to database", None)?; tokio::spawn(async move { if let Err(e) = connection.await { eprintln!("connection error: {}", e); @@ -121,7 +128,29 @@ macro_rules! with_connection_async { }); $op(client).await } else { - panic!("tried to migrate async from config for a postgresql database, but either tokio or tokio-postgres was not enabled!"); + panic!("tried to migrate async from config for a postgresql database, but tokio-postgres was not enabled!"); + } + } + } + ConfigDbType::Mssql => { + cfg_if::cfg_if! { + if #[cfg(feature = "tiberius-config")] { + use tiberius_driver::{Client, Config}; + use tokio::net::TcpStream; + use tokio_util::compat::TokioAsyncWriteCompatExt; + use std::convert::TryInto; + + let config: Config = (&*$config).try_into().unwrap(); + let tcp = TcpStream::connect(config.get_addr()) + .await + .migration_err("could not connect to database", None)?; + let client = Client::connect(config, tcp.compat_write()) + .await + .migration_err("could not connect to database", None)?; + + $op(client).await + } else { + panic!("tried to migrate async from config for a mssql database, but tiberius-config feature was not enabled!"); } } } @@ -172,7 +201,11 @@ impl crate::Migrate for Config { } } -#[cfg(any(feature = "mysql_async", feature = "tokio-postgres",))] +#[cfg(any( + feature = "mysql_async", + feature = "tokio-postgres", + feature = "tiberius-config" +))] #[async_trait] impl crate::AsyncMigrate for Config { async fn get_last_applied_migration(&mut self) -> Result, Error> { diff --git a/refinery_core/src/drivers/mod.rs b/refinery_core/src/drivers/mod.rs index 0f7d9ed0..867d4c4d 100644 --- a/refinery_core/src/drivers/mod.rs +++ b/refinery_core/src/drivers/mod.rs @@ -13,4 +13,7 @@ pub mod postgres; #[cfg(feature = "mysql")] pub mod mysql; +#[cfg(feature = "tiberius")] +pub mod tiberius; + mod config; diff --git a/refinery_core/src/drivers/tiberius.rs b/refinery_core/src/drivers/tiberius.rs new file mode 100644 index 00000000..1595e90b --- /dev/null +++ b/refinery_core/src/drivers/tiberius.rs @@ -0,0 +1,81 @@ +use crate::traits::r#async::{AsyncMigrate, AsyncQuery, AsyncTransaction}; +use crate::Migration; + +use async_trait::async_trait; +use chrono::{DateTime, Local}; +use futures::{ + io::{AsyncRead, AsyncWrite}, + TryStreamExt, +}; +use tiberius_driver::{error::Error, Client, QueryItem}; + +async fn query_applied_migrations( + client: &mut Client, + query: &str, +) -> Result, Error> { + let mut rows = client.simple_query(query).await?; + let mut applied = Vec::new(); + // Unfortunately too many unwraps as `Row::get` maps to Option instead of T + while let Some(item) = rows.try_next().await? { + if let QueryItem::Row(row) = item { + let version = row.get::(0).unwrap(); + let applied_on: &str = row.get::<&str, usize>(2).unwrap(); + let applied_on = DateTime::parse_from_rfc3339(applied_on) + .unwrap() + .with_timezone(&Local); + let checksum: String = row.get::<&str, usize>(3).unwrap().to_string(); + + applied.push(Migration::applied( + version, + row.get::<&str, usize>(1).unwrap().to_string(), + applied_on, + checksum + .parse::() + .expect("checksum must be a valid u64"), + )); + } + } + + Ok(applied) +} + +#[async_trait] +impl AsyncTransaction for Client +where + S: AsyncRead + AsyncWrite + Unpin + Send, +{ + type Error = Error; + + async fn execute(&mut self, queries: &[&str]) -> Result { + // Tiberius doesn't support transactions, see https://github.com/prisma/tiberius/issues/28 + self.simple_query("BEGIN TRAN T1;").await?; + let mut count = 0; + for query in queries { + if let Err(err) = self.execute(*query, &[]).await { + if let Err(err) = self.simple_query("ROLLBACK TRAN T1").await { + log::error!("could not ROLLBACK transaction, {}", err); + } + return Err(err); + } + count += 1; + } + self.simple_query("COMMIT TRAN T1").await?; + Ok(count as usize) + } +} + +#[async_trait] +impl AsyncQuery> for Client +where + S: AsyncRead + AsyncWrite + Unpin + Send, +{ + async fn query( + &mut self, + query: &str, + ) -> Result, ::Error> { + let applied = query_applied_migrations(self, query).await?; + Ok(applied) + } +} + +impl AsyncMigrate for Client where S: AsyncRead + AsyncWrite + Unpin + Send {} diff --git a/refinery_core/src/drivers/tokio_postgres.rs b/refinery_core/src/drivers/tokio_postgres.rs index 8a6ecf5f..e414a76b 100644 --- a/refinery_core/src/drivers/tokio_postgres.rs +++ b/refinery_core/src/drivers/tokio_postgres.rs @@ -2,8 +2,8 @@ use crate::traits::r#async::{AsyncMigrate, AsyncQuery, AsyncTransaction}; use crate::Migration; use async_trait::async_trait; use chrono::{DateTime, Local}; -use tokio_postgres::error::Error as PgError; -use tokio_postgres::{Client, Transaction as PgTransaction}; +use tokio_postgres_driver::error::Error as PgError; +use tokio_postgres_driver::{Client, Transaction as PgTransaction}; async fn query_applied_migrations( transaction: &PgTransaction<'_>, diff --git a/refinery_core/src/error.rs b/refinery_core/src/error.rs index 05f8d47a..8da023d5 100644 --- a/refinery_core/src/error.rs +++ b/refinery_core/src/error.rs @@ -60,9 +60,10 @@ pub enum Kind { /// An Error from an invalid migrations path location #[error("invalid migrations path {0}, {1}")] InvalidMigrationPath(PathBuf, std::io::Error), - /// An Error from an underlying database connection Error + /// An Error parsing refinery Config #[error("Error parsing config: {0}")] ConfigError(String), + /// An Error from an underlying database connection Error #[error("`{0}`, `{1}`")] Connection(String, #[source] Box), } diff --git a/refinery_core/src/lib.rs b/refinery_core/src/lib.rs index b0b72771..eaf039c3 100644 --- a/refinery_core/src/lib.rs +++ b/refinery_core/src/lib.rs @@ -21,10 +21,10 @@ pub use postgres; pub use mysql; #[cfg(feature = "tokio-postgres")] -pub use tokio_postgres; +pub use tokio_postgres_driver as tokio_postgres; #[cfg(feature = "mysql_async")] pub use mysql_async; -#[cfg(feature = "tokio")] -pub use tokio; +#[cfg(feature = "tiberius")] +pub use tiberius_driver as tiberius; diff --git a/refinery_core/src/traits/mod.rs b/refinery_core/src/traits/mod.rs index dbfa1c6a..2d072dcc 100644 --- a/refinery_core/src/traits/mod.rs +++ b/refinery_core/src/traits/mod.rs @@ -89,10 +89,22 @@ pub(crate) fn verify_migrations( Ok(to_be_applied) } +#[cfg(feature = "tiberius")] pub(crate) const ASSERT_MIGRATIONS_TABLE_QUERY: &str = - "CREATE TABLE IF NOT EXISTS refinery_schema_history( \ - version INT4 PRIMARY KEY,\ - name VARCHAR(255),\ + "IF NOT EXISTS(SELECT 1 FROM sys.Tables WHERE Name = N'refinery_scgema_history') + BEGIN + CREATE TABLE refinery_schema_history( + version INT PRIMARY KEY, + name VARCHAR(255), + applied_on VARCHAR(255), + checksum VARCHAR(255)); + END"; + +#[cfg(not(feature = "tiberius"))] +pub(crate) const ASSERT_MIGRATIONS_TABLE_QUERY: &str = + "CREATE TABLE IF NOT EXISTS refinery_schema_history( + version INT4 PRIMARY KEY, + name VARCHAR(255), applied_on VARCHAR(255), checksum VARCHAR(255));"; @@ -100,7 +112,7 @@ pub(crate) const GET_APPLIED_MIGRATIONS_QUERY: &str = "SELECT version, name, app FROM refinery_schema_history ORDER BY version ASC;"; pub(crate) const GET_LAST_APPLIED_MIGRATION_QUERY: &str = - "SELECT version, name, applied_on, checksum \ + "SELECT version, name, applied_on, checksum FROM refinery_schema_history WHERE version=(SELECT MAX(version) from refinery_schema_history)"; #[cfg(test)]