Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iterable run method #296

Merged
merged 23 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
store connection handle in runner
  • Loading branch information
superstator committed Nov 11, 2023
commit f3a5d39968efcf435a7b4db738857a94ec032fa4
9 changes: 8 additions & 1 deletion examples/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,12 @@ mod embedded {

fn main() {
let mut conn = Connection::open_in_memory().unwrap();
embedded::migrations::runner().run(&mut conn).unwrap();
embedded::migrations::runner(&mut conn).run().unwrap();
}

fn iter_main() {
superstator marked this conversation as resolved.
Show resolved Hide resolved
let mut conn = Connection::open_in_memory().unwrap();
for migration in embedded::migrations::runner(&mut conn) {
info!("Got a migration: {}", migration.expect("migration failed!"));
}
}
8 changes: 4 additions & 4 deletions refinery_cli/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ fn run_migrations(
.context("Can't start tokio runtime")?;

runtime.block_on(async {
Runner::new(&migrations)
Runner::new(&migrations, &mut config)
.set_grouped(grouped)
.set_target(target)
.set_abort_divergent(divergent)
.set_abort_missing(missing)
.set_migration_table_name(table_name)
.run_async(&mut config)
.run_async()
.await
})?;
} else {
Expand All @@ -89,13 +89,13 @@ fn run_migrations(
_db_type @ (ConfigDbType::Mysql | ConfigDbType::Postgres | ConfigDbType::Sqlite) => {
cfg_if::cfg_if! {
if #[cfg(any(feature = "mysql", feature = "postgresql", feature = "sqlite"))] {
Runner::new(&migrations)
Runner::new(&migrations, &mut config)
.set_grouped(grouped)
.set_abort_divergent(divergent)
.set_abort_missing(missing)
.set_target(target)
.set_migration_table_name(table_name)
.run(&mut config)?;
.run()?;
} else {
panic!("tried to migrate async from config for a {:?} database, but it's matching feature was not enabled!", _db_type);
}
Expand Down
139 changes: 87 additions & 52 deletions refinery_core/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ use regex::Regex;
use siphasher::sip::SipHasher13;
use time::OffsetDateTime;

use log::error;
use std::cmp::Ordering;
use std::collections::VecDeque;
use std::fmt;
use std::hash::{Hash, Hasher};

use crate::error::Kind;
use crate::traits::{DEFAULT_MIGRATION_TABLE_NAME, sync::migrate as sync_migrate};
use crate::traits::{sync::migrate as sync_migrate, DEFAULT_MIGRATION_TABLE_NAME};
use crate::{AsyncMigrate, Error, Migrate};
use std::fmt::Formatter;

Expand Down Expand Up @@ -241,7 +242,8 @@ impl Report {
/// `Runner` should not need to be instantiated manually
///
/// [`embed_migrations!`]: macro.embed_migrations.html
pub struct Runner {
pub struct Runner<'a, C> {
connection: &'a mut C,
grouped: bool,
abort_divergent: bool,
abort_missing: bool,
Expand All @@ -250,10 +252,11 @@ pub struct Runner {
migration_table_name: String,
}

impl Runner {
impl<'a, C> Runner<'a, C> {
/// instantiate a new Runner
pub fn new(migrations: &[Migration]) -> Runner {
pub fn new(migrations: &[Migration], connection: &'a mut C) -> Runner<'a, C> {
Runner {
connection,
grouped: false,
target: Target::Latest,
abort_divergent: true,
Expand All @@ -272,7 +275,7 @@ impl Runner {
/// Version migrates to a user provided version, a Version with a higher version than the latest will be ignored,
/// and Fake doesn't actually run any migration, just creates and updates refinery's schema migration table
/// by default this is set to Latest
pub fn set_target(self, target: Target) -> Runner {
pub fn set_target(self, target: Target) -> Runner<'a, C> {
Runner { target, ..self }
}

Expand All @@ -283,14 +286,14 @@ impl Runner {
///
/// set_grouped won't probably work on MySQL Databases as MySQL lacks support for transactions around schema alteration operations,
/// meaning that if a migration fails to apply you will have to manually unpick the changes in order to try again (it’s impossible to roll back to an earlier point).
pub fn set_grouped(self, grouped: bool) -> Runner {
pub fn set_grouped(self, grouped: bool) -> Runner<'a, C> {
Runner { grouped, ..self }
}

/// Set true if migration process should abort if divergent migrations are found
/// i.e. applied migrations with the same version but different name or checksum from the ones on the filesystem.
/// by default this is set to true
pub fn set_abort_divergent(self, abort_divergent: bool) -> Runner {
pub fn set_abort_divergent(self, abort_divergent: bool) -> Runner<'a, C> {
Runner {
abort_divergent,
..self
Expand All @@ -301,23 +304,23 @@ impl Runner {
/// i.e. applied migrations that are not found on the filesystem,
/// or migrations found on filesystem with a version inferior to the last one applied but not applied.
/// by default this is set to true
pub fn set_abort_missing(self, abort_missing: bool) -> Runner {
pub fn set_abort_missing(self, abort_missing: bool) -> Runner<'a, C> {
Runner {
abort_missing,
..self
}
}

/// Queries the database for the last applied migration, returns None if there aren't applied Migrations
pub fn get_last_applied_migration<C>(&self, conn: &'_ mut C) -> Result<Option<Migration>, Error>
pub fn get_last_applied_migration(&self, conn: &'_ mut C) -> Result<Option<Migration>, Error>
where
C: Migrate,
{
Migrate::get_last_applied_migration(conn, &self.migration_table_name)
}

/// Queries the database asynchronously for the last applied migration, returns None if there aren't applied Migrations
pub async fn get_last_applied_migration_async<C>(
pub async fn get_last_applied_migration_async(
&self,
conn: &mut C,
) -> Result<Option<Migration>, Error>
Expand All @@ -328,18 +331,15 @@ impl Runner {
}

/// Queries the database for all previous applied migrations
pub fn get_applied_migrations<C>(&self, conn: &'_ mut C) -> Result<Vec<Migration>, Error>
pub fn get_applied_migrations(&self, conn: &'_ mut C) -> Result<Vec<Migration>, Error>
where
C: Migrate,
{
Migrate::get_applied_migrations(conn, &self.migration_table_name)
}

/// Queries the database asynchronously for all previous applied migrations
pub async fn get_applied_migrations_async<C>(
&self,
conn: &mut C,
) -> Result<Vec<Migration>, Error>
pub async fn get_applied_migrations_async(&self, conn: &mut C) -> Result<Vec<Migration>, Error>
where
C: AsyncMigrate + Send,
{
Expand Down Expand Up @@ -367,21 +367,23 @@ impl Runner {
self
}

/// Creates an iterator over the migrations, running each in the
/// supplied database connection before returning them from `next()`
pub fn run_iter<'a, C>(&self, conn: &'a mut C) -> MigrationIterator<'a, C> where C: Migrate {
MigrationIterator::new(self, conn)
/// Creates an iterator over pending migrations, applying each before returning
/// the result from `next()`. If a migration fails, the iterator will return that
/// result and further calls to `next()` will return `None`.
pub fn into_iter(self) -> RunIterator<'a, C>
where
C: Migrate,
{
RunIterator::new(self)
}



/// Runs the Migrations in the supplied database connection
pub fn run<C>(&self, conn: &'_ mut C) -> Result<Report, Error>
pub fn run(&'a mut self) -> Result<Report, Error>
where
C: Migrate,
{
Migrate::migrate(
conn,
self.connection,
&self.migrations,
self.abort_divergent,
self.abort_missing,
Expand All @@ -392,12 +394,12 @@ impl Runner {
}

/// Runs the Migrations asynchronously in the supplied database connection
pub async fn run_async<C>(&self, conn: &mut C) -> Result<Report, Error>
pub async fn run_async(&'a mut self) -> Result<Report, Error>
where
C: AsyncMigrate + Send,
{
AsyncMigrate::migrate(
conn,
self.connection,
&self.migrations,
self.abort_divergent,
self.abort_missing,
Expand All @@ -409,40 +411,73 @@ impl Runner {
}
}

pub struct MigrationIterator<'a, C> {
impl<'a, C> IntoIterator for Runner<'a, C>
where
C: Migrate,
{
type Item = Result<Migration, Error>;
type IntoIter = RunIterator<'a, C>;

fn into_iter(self) -> Self::IntoIter {
Runner::into_iter(self)
}
}

pub struct RunIterator<'a, C> {
connection: &'a mut C,
target: Target,
migration_table_name: String,
items: VecDeque<Migration>
items: VecDeque<Migration>,
failed: bool,
}
impl<'a, C> MigrationIterator<'a, C> where C: Migrate {
pub(crate) fn new(runner: &Runner, connection: &'a mut C) -> MigrationIterator<'a, C> {
Self {
items: VecDeque::from(Migrate::get_unapplied_migrations(
connection,
&runner.migrations,
runner.abort_divergent,
runner.abort_missing,
&runner.migration_table_name).unwrap()),
connection,
target: runner.target,
migration_table_name: runner.migration_table_name.clone(),
}
impl<'a, C> RunIterator<'a, C>
where
C: Migrate,
{
pub(crate) fn new(runner: Runner<'a, C>) -> RunIterator<'a, C> {
RunIterator {
items: VecDeque::from(
Migrate::get_unapplied_migrations(
runner.connection,
&runner.migrations,
runner.abort_divergent,
runner.abort_missing,
&runner.migration_table_name,
)
.unwrap(),
),
connection: runner.connection,
target: runner.target,
migration_table_name: runner.migration_table_name.clone(),
failed: false,
}
}
}
impl<C> Iterator for MigrationIterator<'_, C> where C: Migrate {
type Item = Migration;
impl<C> Iterator for RunIterator<'_, C>
where
C: Migrate,
{
type Item = Result<Migration, Error>;

fn next(&mut self) -> Option<Self::Item> {
self.items.pop_front().map(|migration|
sync_migrate(self.connection,
vec![migration],
self.target,
&self.migration_table_name))
.transpose()
.unwrap_or(None)
.and_then(|r| r.applied_migrations
.first()
.cloned())
match self.failed {
true => None,
false => {
self.items.pop_front().map(|migration| {
sync_migrate(
self.connection,
vec![migration],
self.target,
&self.migration_table_name,
)
.map(|r| r.applied_migrations.first().cloned().unwrap())
.map_err(|e| {
error!("migration failed: {e:?}");
self.failed = true;
e
})
})
}
}
}
}
8 changes: 4 additions & 4 deletions refinery_macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ pub(crate) fn crate_root() -> PathBuf {
fn migration_fn_quoted<T: ToTokens>(_migrations: Vec<T>) -> TokenStream2 {
let result = quote! {
use refinery::{Migration, Runner};
pub fn runner() -> Runner {
pub fn runner<'a, C>(connection: &'a mut C) -> Runner<'a, C> {
let quoted_migrations: Vec<(&str, String)> = vec![#(#_migrations),*];
let mut migrations: Vec<Migration> = Vec::new();
for module in quoted_migrations.into_iter() {
migrations.push(Migration::unapplied(module.0, &module.1).unwrap());
}
Runner::new(&migrations)
Runner::new(&migrations, connection)
}
};
result
Expand Down Expand Up @@ -103,13 +103,13 @@ mod tests {
let migs = vec![quote!("V1__first", "valid_sql_file")];
let expected = concat! {
"use refinery :: { Migration , Runner } ; ",
"pub fn runner () -> Runner { ",
"pub fn runner < 'a , C > (connection : & 'a mut C) -> Runner < 'a , C > { ",
"let quoted_migrations : Vec < (& str , String) > = vec ! [\"V1__first\" , \"valid_sql_file\"] ; ",
"let mut migrations : Vec < Migration > = Vec :: new () ; ",
"for module in quoted_migrations . into_iter () { ",
"migrations . push (Migration :: unapplied (module . 0 , & module . 1) . unwrap ()) ; ",
"} ",
"Runner :: new (& migrations) }"
"Runner :: new (& migrations , connection) }"
};
assert_eq!(expected, migration_fn_quoted(migs).to_string());
}
Expand Down
Loading