diff --git a/src/deploy/mod.rs b/src/deploy/mod.rs index 6166011..a390a37 100644 --- a/src/deploy/mod.rs +++ b/src/deploy/mod.rs @@ -224,6 +224,8 @@ async fn recalculate_mode_scores( rx: i32, ctx: Arc, mods_value: Option, + mapper_filter: Option, + map_filter: Option>, ) -> anyhow::Result<()> { let scores_table = match rx { 0 => "scores", @@ -237,13 +239,35 @@ async fn recalculate_mode_scores( None => "".to_string(), }; - let beatmap_md5s: Vec<(String,)> = sqlx::query_as(&format!( - "SELECT beatmap_md5, COUNT(*) AS c FROM {} WHERE completed IN (2, 3) AND play_mode = ? {} GROUP BY beatmap_md5 ORDER BY c DESC", - scores_table, mods_query_str, - )) - .bind(mode) - .fetch_all(ctx.database.get().await?.deref_mut()) - .await?; + let beatmap_md5s: Vec<(String,)> = if let Some(mapper_filter) = mapper_filter { + sqlx::query_as(&format!( + "SELECT beatmap_md5, COUNT(*) AS c FROM {} INNER JOIN beatmaps USING(beatmap_md5) + WHERE completed IN (2, 3) AND play_mode = ? {} AND beatmaps.file_name LIKE ? GROUP BY beatmap_md5 ORDER BY c DESC", + scores_table, mods_query_str, + )) + .bind(mode) + .bind(format!("%({mapper_filter})%")) + .fetch_all(ctx.database.get().await?.deref_mut()) + .await? + } else if let Some(map_filter) = map_filter { + sqlx::query_as(&format!( + "SELECT beatmap_md5, COUNT(*) AS c FROM {} INNER JOIN beatmaps USING(beatmap_md5) + WHERE completed IN (2, 3) AND play_mode = ? {} AND beatmaps.beatmap_id IN ? GROUP BY beatmap_md5 ORDER BY c DESC", + scores_table, mods_query_str, + )) + .bind(mode) + .bind(map_filter) + .fetch_all(ctx.database.get().await?.deref_mut()) + .await? + } else { + sqlx::query_as(&format!( + "SELECT beatmap_md5, COUNT(*) AS c FROM {} WHERE completed IN (2, 3) AND play_mode = ? {} GROUP BY beatmap_md5 ORDER BY c DESC", + scores_table, mods_query_str, + )) + .bind(mode) + .fetch_all(ctx.database.get().await?.deref_mut()) + .await? + }; let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_BEATMAP_TASKS)); @@ -372,6 +396,8 @@ async fn recalculate_statuses( mode: i32, rx: i32, ctx: Arc, + mapper_filter: Option, + map_filter: Option>, ) -> anyhow::Result<()> { let scores_table = match rx { 0 => "scores", @@ -380,16 +406,44 @@ async fn recalculate_statuses( _ => unreachable!(), }; - let beatmap_md5s: Vec<(String,)> = sqlx::query_as( - &format!( - "SELECT DISTINCT beatmap_md5 FROM {} WHERE userid = ? AND completed IN (2, 3) AND play_mode = ?", - scores_table + let beatmap_md5s: Vec<(String,)> = if let Some(mapper_filter) = mapper_filter { + sqlx::query_as( + &format!( + "SELECT DISTINCT beatmap_md5 FROM {} INNER JOIN beatmaps USING(beatmap_md5) + WHERE userid = ? AND completed IN (2, 3) AND play_mode = ? AND beatmaps.file_name LIKE ?", + scores_table + ) ) - ) - .bind(user_id) - .bind(mode) - .fetch_all(ctx.database.get().await?.deref_mut()) - .await?; + .bind(user_id) + .bind(mode) + .bind(format!("%({mapper_filter})%")) + .fetch_all(ctx.database.get().await?.deref_mut()) + .await? + } else if let Some(map_filter) = map_filter { + sqlx::query_as( + &format!( + "SELECT DISTINCT beatmap_md5 FROM {} INNER JOIN beatmaps USING(beatmap_md5) + WHERE userid = ? AND completed IN (2, 3) AND play_mode = ? AND beatmaps.beatmap_id IN ?", + scores_table + ) + ) + .bind(user_id) + .bind(mode) + .bind(map_filter) + .fetch_all(ctx.database.get().await?.deref_mut()) + .await? + } else { + sqlx::query_as( + &format!( + "SELECT DISTINCT beatmap_md5 FROM {} WHERE userid = ? AND completed IN (2, 3) AND play_mode = ?", + scores_table + ) + ) + .bind(user_id) + .bind(mode) + .fetch_all(ctx.database.get().await?.deref_mut()) + .await? + }; for (beatmap_md5,) in beatmap_md5s { recalculate_status(user_id, mode, rx, beatmap_md5, ctx.clone()).await?; @@ -588,6 +642,8 @@ struct DeployArgs { relax_bits: Vec, total_pp_only: bool, mods_filter: Option, + mapper_filter: Option, + map_filter: Option>, } fn deploy_args_from_env() -> anyhow::Result { @@ -595,6 +651,8 @@ fn deploy_args_from_env() -> anyhow::Result { let relax_bits_str = std::env::var("DEPLOY_RELAX_BITS")?; let total_pp_only_str = std::env::var("DEPLOY_TOTAL_PP_ONLY").unwrap_or("".to_string()); let mods_filter_str = std::env::var("DEPLOY_MODS_FILTER").ok(); + let mapper_filter_str = std::env::var("DEPLOY_MAPPER_FILTER").ok(); + let map_filter_str = std::env::var("DEPLOY_MAP_FILTER").ok(); Ok(DeployArgs { modes: modes_str @@ -610,6 +668,14 @@ fn deploy_args_from_env() -> anyhow::Result { total_pp_only: total_pp_only_str.to_lowercase().trim() == "1", mods_filter: mods_filter_str .map(|mods| mods.trim().parse::().expect("failed to parse mods")), + mapper_filter: mapper_filter_str, + map_filter: map_filter_str.map(|map_filter| { + map_filter.trim().split(',').map(|s| { + s.parse::() + .expect("failed to parse map") + .collect::>() + }) + }), }) } @@ -680,11 +746,59 @@ fn deploy_args_from_input() -> anyhow::Result { std::io::stdout().flush()?; } + print!("Mapper recalc only (y/n): "); + std::io::stdout().flush()?; + + let mut mapper_recalc_only_str = String::new(); + std::io::stdin().read_line(&mut mapper_recalc_only_str)?; + let mapper_recalc_only = mapper_recalc_only_str.to_lowercase().trim() == "y"; + + print!("\n"); + std::io::stdout().flush()?; + + let mut mapper_filter: Option> = None; + if mapper_recalc_only { + print!("Mappers (comma delimited string): "); + std::io::stdout().flush()?; + + let mut mapper_str = String::new(); + std::io::stdin().read_line(&mut mapper_str)?; + mapper_filter = Some(mapper_str.trim()); + + print!("\n"); + std::io::stdout().flush()?; + } + + print!("Map recalc only (y/n): "); + std::io::stdout().flush()?; + + let mut map_recalc_only_str = String::new(); + std::io::stdin().read_line(&mut map_recalc_only_str)?; + let map_recalc_only = map_recalc_only_str.to_lowercase().trim() == "y"; + + print!("\n"); + std::io::stdout().flush()?; + + let mut map_filter: Option> = None; + if map_recalc_only { + print!("Maps (comma delimited IDs): "); + std::io::stdout().flush()?; + + let mut map_str = String::new(); + std::io::stdin().read_line(&mut map_str)?; + map_filter = Some(map_str.trim().split(',').map(|s| s.parse().expect("failed to parse map")).collect::>()); + + print!("\n"); + std::io::stdout().flush()?; + } + Ok(DeployArgs { modes, relax_bits, total_pp_only: total_only, mods_filter: mods_value, + mapper_filter, + map_filter, }) } @@ -721,7 +835,7 @@ pub async fn serve(context: Context) -> anyhow::Result<()> { .await?; } } else { - recalculate_mode_scores(mode, 0, context_arc.clone(), deploy_args.mods_filter) + recalculate_mode_scores(mode, 0, context_arc.clone(), deploy_args.mods_filter, deploy_args.mapper_filter) .await?; } } diff --git a/src/mass_recalc/mod.rs b/src/mass_recalc/mod.rs index 9b83f42..09982a3 100644 --- a/src/mass_recalc/mod.rs +++ b/src/mass_recalc/mod.rs @@ -8,6 +8,7 @@ use crate::{ usecases, }; +use lapin::options::QueuePurgeOptions; use lapin::{options::BasicPublishOptions, BasicProperties}; use redis::AsyncCommands; @@ -132,6 +133,11 @@ pub async fn serve(context: Context) -> anyhow::Result<()> { .await? .expect("failed to find rework"); + context + .amqp_channel + .queue_purge("rework_queue", QueuePurgeOptions::default()) + .await?; + sqlx::query("DELETE FROM rework_scores WHERE rework_id = ?") .bind(mass_recalc_args.rework_id) .execute(context.database.get().await?.deref_mut())