Skip to content

Commit

Permalink
feat(cli): add global optimization mode for sort command
Browse files Browse the repository at this point in the history
  • Loading branch information
ivan-aksamentov committed Mar 7, 2025
1 parent 6892c1d commit 1f18da3
Show file tree
Hide file tree
Showing 7 changed files with 513 additions and 46 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/nextclade-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ indexmap = { workspace = true }
itertools = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
maplit = { workspace = true }
nextclade = { path = "../nextclade" }
num_cpus = { workspace = true }
ordered-float = { workspace = true }
Expand Down
158 changes: 118 additions & 40 deletions packages/nextclade-cli/src/cli/nextclade_seq_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ use crate::io::http_client::HttpClient;
use eyre::{Report, WrapErr};
use itertools::Itertools;
use log::{trace, LevelFilter};
use maplit::btreemap;
use nextclade::io::csv::CsvStructFileWriter;
use nextclade::io::fasta::{FastaReader, FastaRecord, FastaWriter};
use nextclade::io::fs::path_to_string;
use nextclade::make_error;
use nextclade::sort::minimizer_index::{MinimizerIndexJson, MINIMIZER_INDEX_ALGO_VERSION};
use nextclade::sort::minimizer_search::{run_minimizer_search, MinimizerSearchDatasetResult, MinimizerSearchRecord};
use nextclade::sort::minimizer_search::{
find_best_datasets, run_minimizer_search, MinimizerSearchDatasetResult, MinimizerSearchRecord,
};
use nextclade::utils::option::{OptionMapMutFallible, OptionMapRefFallible};
use nextclade::utils::string::truncate;
use nextclade::{make_error, make_internal_report};
use ordered_float::OrderedFloat;
use owo_colors::OwoColorize;
use schemars::JsonSchema;
Expand Down Expand Up @@ -151,43 +154,122 @@ fn writer_thread(
verbose: bool,
) -> Result<(), Report> {
let NextcladeSortArgs {
input_fastas,
output_dir,
output_path,
output_results_tsv,
search_params,
..
} = args;

let template = output_path.map_ref_fallible(move |output_path| -> Result<TinyTemplate, Report> {
let mut template = TinyTemplate::new();
template
.add_template("output", output_path)
.wrap_err_with(|| format!("When parsing template: '{output_path}'"))?;
Ok(template)
})?;
if search_params.global {
// NOTE(perf): this gathers all suggestions results and discards sequence data to make sure we don't store the
// whole thing in memory. We will have to read fasta again later to write some outputs.
let results: BTreeMap<usize, _> = result_receiver
.iter()
.map(|result| (result.fasta_record.index, result.result))
.collect();

// let seqs_with_no_hits = results
// .iter()
// .filter(|(_, r)| r.datasets.is_empty())
// .map(|(fasta_index, _)| fasta_index)
// .sorted_unstable()
// .copied()
// .collect_vec();

let best_datasets = find_best_datasets(&results, search_params)?;

let mut stats = StatsPrinter::new(
"Suggested datasets for each sequence (after global optimization)",
verbose,
);
let mut reader = FastaReader::from_paths(input_fastas)?;
let mut writer = DatasetSortWriter::new(output_path, output_dir, output_results_tsv)?;
loop {
let mut record = FastaRecord::default();
reader.read(&mut record)?;
if record.is_empty() {
break;
}

let mut writers = BTreeMap::new();
let mut stats = StatsPrinter::new(verbose);
let best_dataset = best_datasets
.iter()
.find(|best_dataset| best_dataset.qry_indices.contains(&record.index));

let datasets = if let Some(best_dataset) = best_dataset {
let dataset = results[&record.index]
.datasets
.iter()
.find(|d| d.name == best_dataset.name)
.ok_or_else(|| make_internal_report!("Unable to find dataset '{}'", best_dataset.name))?;
vec![dataset.clone()]
} else {
vec![]
};
stats.print_seq(&datasets, &record.seq_name);
writer.write_one(&record, &datasets)?;
}
} else {
let mut stats = StatsPrinter::new("Suggested datasets for each sequence", verbose);
let mut writer = DatasetSortWriter::new(output_path, output_dir, output_results_tsv)?;
for MinimizerSearchRecord { fasta_record, result } in result_receiver {
let datasets = {
if result.datasets.len() == 0 {
&[]
} else if search_params.all_matches {
&result.datasets[..]
} else {
&result.datasets[0..1]
}
};
stats.print_seq(datasets, &fasta_record.seq_name);
writer.write_one(&fasta_record, datasets)?;
}
stats.finish();
}

let mut results_csv =
output_results_tsv.map_ref_fallible(|output_results_tsv| CsvStructFileWriter::new(output_results_tsv, b'\t'))?;
Ok(())
}

for record in result_receiver {
let datasets = &{
if search_params.all_matches {
record.result.datasets
} else {
record.result.datasets.into_iter().take(1).collect_vec()
}
};
pub struct DatasetSortWriter<'t> {
writers: BTreeMap<PathBuf, FastaWriter>,
results_csv: Option<CsvStructFileWriter>,
output_dir: Option<PathBuf>,
template: Option<TinyTemplate<'t>>,
}

stats.print_seq(datasets, &record.fasta_record.seq_name);
impl<'t> DatasetSortWriter<'t> {
pub fn new(
output_path: &'t Option<String>,
output_dir: &Option<PathBuf>,
output_results_tsv: &Option<String>,
) -> Result<Self, Report> {
let template = output_path.map_ref_fallible(move |output_path| -> Result<TinyTemplate<'t>, Report> {
let mut template = TinyTemplate::new();
template
.add_template("output", output_path)
.wrap_err_with(|| format!("When parsing template: '{output_path}'"))?;
Ok(template)
})?;

let results_csv =
output_results_tsv.map_ref_fallible(|output_results_tsv| CsvStructFileWriter::new(output_results_tsv, b'\t'))?;

Ok(Self {
writers: btreemap! {},
results_csv,
output_dir: output_dir.clone(),
template,
})
}

pub fn write_one(&mut self, record: &FastaRecord, datasets: &[MinimizerSearchDatasetResult]) -> Result<(), Report> {
if datasets.is_empty() {
results_csv.map_mut_fallible(|results_csv| {
self.results_csv.map_mut_fallible(|results_csv| {
results_csv.write(&SeqSortCsvEntry {
index: record.fasta_record.index,
seq_name: &record.fasta_record.seq_name,
index: record.index,
seq_name: &record.seq_name,
dataset: None,
score: None,
num_hits: None,
Expand All @@ -196,38 +278,34 @@ fn writer_thread(
}

for dataset in datasets {
results_csv.map_mut_fallible(|results_csv| {
self.results_csv.map_mut_fallible(|results_csv| {
results_csv.write(&SeqSortCsvEntry {
index: record.fasta_record.index,
seq_name: &record.fasta_record.seq_name,
index: record.index,
seq_name: &record.seq_name,
dataset: Some(&dataset.name),
score: Some(dataset.score),
num_hits: Some(dataset.n_hits),
})
})?;
}

let names = datasets
let dataset_names = datasets
.iter()
.map(|dataset| get_all_prefix_names(&dataset.name))
.collect::<Result<Vec<Vec<String>>, Report>>()?
.into_iter()
.flatten()
.unique();

for name in names {
let filepath = get_filepath(&name, &template, output_dir)?;

for name in dataset_names {
let filepath = get_filepath(&name, &self.template, &self.output_dir)?;
if let Some(filepath) = filepath {
let writer = get_or_insert_writer(&mut writers, filepath)?;
writer.write(&record.fasta_record.seq_name, &record.fasta_record.seq, false)?;
let writer = get_or_insert_writer(&mut self.writers, filepath)?;
writer.write(&record.seq_name, &record.seq, false)?;
}
}
Ok(())
}

stats.finish();

Ok(())
}

pub fn get_all_prefix_names(name: impl AsRef<str>) -> Result<Vec<String>, Report> {
Expand All @@ -250,9 +328,9 @@ struct StatsPrinter {
}

impl StatsPrinter {
fn new(enabled: bool) -> Self {
fn new(title: impl AsRef<str>, enabled: bool) -> Self {
if enabled {
println!("Suggested datasets for each sequence");
println!("{}", title.as_ref());
println!("{}┐", "─".repeat(110));
println!(
"{:^40} │ {:^40} │ {:^10} │ {:^10} │",
Expand Down
4 changes: 2 additions & 2 deletions packages/nextclade/src/io/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use eyre::{Report, WrapErr};
use flate2::read::MultiGzDecoder;
use flate2::write::GzEncoder;
use flate2::Compression as GzCompressionLevel;
use log::debug;
use log::trace;
use std::env;
use std::io::{ErrorKind, Read, Write};
use std::path::Path;
Expand Down Expand Up @@ -65,7 +65,7 @@ pub fn guess_compression_from_filepath(filepath: impl AsRef<Path>) -> (Compressi
_ => CompressionType::None,
};

debug!(
trace!(
"When processing '{filepath:#?}': detected file extension '{ext}'. \
It will be using algorithm: '{compression_type}'"
);
Expand Down
92 changes: 91 additions & 1 deletion packages/nextclade/src/sort/minimizer_search.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use crate::io::fasta::FastaRecord;
use crate::make_internal_report;
use crate::sort::minimizer_index::{MinimizerIndexJson, MinimizerIndexParams};
use crate::sort::params::NextcladeSeqSortParams;
use crate::utils::map::key_of_max_value;
use eyre::Report;
use itertools::{izip, Itertools};
use log::debug;
use maplit::btreemap;
use ordered_float::OrderedFloat;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

use std::collections::{BTreeMap, BTreeSet};

#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -96,6 +100,92 @@ pub fn run_minimizer_search(
})
}

pub fn find_best_datasets(
results: &BTreeMap<usize, MinimizerSearchResult>,
params: &NextcladeSeqSortParams,
) -> Result<Vec<DatasetSuggestionStats>, Report> {
let mut unmatched: BTreeSet<_> = results
.iter()
.filter(|(_, r)| !r.datasets.is_empty())
.map(|(i, _)| i)
.copied()
.collect();

let mut suggestions = vec![];
let mut top_hit_matches = 0;
let mut total_matches = 0;

for i in 0..params.max_iter {
let mut hit_by_dataset = btreemap! {};
let mut top_hit_by_dataset = btreemap! {};

for qry in &unmatched {
let hits = &results[qry].datasets;
for hit in hits {
*hit_by_dataset.entry(&hit.name).or_insert(0) += 1;
}
if !hits.is_empty() {
*top_hit_by_dataset.entry(&hits[0].name).or_insert(0) += 1;
}
}

let mut best_dataset = *key_of_max_value(&hit_by_dataset)
.ok_or_else(|| make_internal_report!("partition_sequences: no matching best dataset found"))?;

if hit_by_dataset[best_dataset] == 0 {
debug!("partition_sequences: no more hits");
break;
}

let best_top_dataset = *key_of_max_value(&top_hit_by_dataset)
.ok_or_else(|| make_internal_report!("partition_sequences: no matching best top dataset found"))?;

if hit_by_dataset[best_top_dataset] == hit_by_dataset[best_dataset] {
best_dataset = best_top_dataset;
};

let matched: BTreeSet<_> = results
.iter()
.filter(|(_, res)| res.datasets.iter().any(|dataset| &dataset.name == best_dataset))
.map(|(i, _)| i)
.copied()
.collect();

unmatched = unmatched.difference(&matched).copied().collect();

top_hit_matches += top_hit_by_dataset[best_dataset];
total_matches += hit_by_dataset[best_dataset];

debug!("Global search: iteration {}", i);
debug!(
"Global search: added dataset '{}' ({} hits, {} top hits)",
best_dataset, hit_by_dataset[best_dataset], top_hit_by_dataset[best_dataset]
);
debug!("Global search: hit matches {}", total_matches);
debug!("Global search: top hit matches {}", top_hit_matches);
debug!("Global search: unmatched remaining {}", unmatched.len());

suggestions.push(DatasetSuggestionStats {
name: best_dataset.to_owned(),
n_hits: hit_by_dataset[best_top_dataset],
qry_indices: matched.iter().copied().collect(),
});

if unmatched.is_empty() {
break;
}
}

let suggestions = suggestions.into_iter().sorted_by_key(|s| s.n_hits).collect_vec();
Ok(suggestions)
}

pub struct DatasetSuggestionStats {
pub name: String,
pub n_hits: usize,
pub qry_indices: Vec<usize>,
}

const fn invertible_hash(x: u64) -> u64 {
let m: u64 = (1 << 32) - 1;
let mut x: u64 = (!x).wrapping_add(x << 21) & m;
Expand Down
Loading

0 comments on commit 1f18da3

Please sign in to comment.