diff --git a/bigtools/src/utils/cli/bigbedtobed.rs b/bigtools/src/utils/cli/bigbedtobed.rs index 25d6e22..4229717 100644 --- a/bigtools/src/utils/cli/bigbedtobed.rs +++ b/bigtools/src/utils/cli/bigbedtobed.rs @@ -1,3 +1,4 @@ +use std::collections::VecDeque; use std::error::Error; use std::fs::File; use std::io::{self, BufReader, Write}; @@ -6,6 +7,7 @@ use std::path::Path; use clap::Parser; use futures::FutureExt; use tokio::runtime; +use ufmt::uwrite; use crate::utils::reopen::{Reopen, SeekableRead}; use crate::utils::streaming_linereader::StreamingLineReader; @@ -44,6 +46,12 @@ pub struct BigBedToBedArgs { #[arg(short = 't', long)] #[arg(default_value_t = 6)] pub nthreads: usize, + + /// Do not create temporary files for intermediate data. (Only applicable when using multiple threads.) + /// By default, approximately one temporary file will be opened for each core. + #[arg(long)] + #[arg(default_value_t = false)] + pub inmemory: bool, } pub fn bigbedtobed(args: BigBedToBedArgs) -> Result<(), Box> { @@ -75,80 +83,130 @@ pub fn bigbedtobed(args: BigBedToBedArgs) -> Result<(), Box> { write_bed_from_bed(bigbed, bed, overlap_bed)?; } None => { - write_bed(bigbed, bed, nthreads, args.chrom, args.start, args.end)?; + // Right now, we don't offload decompression to separate threads, + // so specifying `chrom` effectively means single-threaded + if nthreads == 1 || args.chrom.is_some() { + write_bed_singlethreaded(bigbed, bed, args.chrom, args.start, args.end)?; + } else { + write_bed(bigbed, bed, args.inmemory, nthreads)?; + } } } Ok(()) } -pub fn write_bed( - bigbed: BigBedRead, - mut out_file: File, - nthreads: usize, +pub fn write_bed_singlethreaded( + mut bigbed: BigBedRead, + out_file: File, chrom: Option, start: Option, end: Option, ) -> Result<(), BBIReadError> { - let runtime = if nthreads == 1 { - runtime::Builder::new_current_thread().build().unwrap() + let start = chrom.as_ref().and_then(|_| start); + let end = chrom.as_ref().and_then(|_| end); + + let chroms: Vec = if let Some(arg_chrom) = chrom { + let chrom = bigbed.chroms().iter().find(|c| c.name == arg_chrom); + let Some(chrom) = chrom else { + eprintln!("{arg_chrom} not found in file."); + return Ok(()); + }; + vec![chrom.clone()] } else { - runtime::Builder::new_multi_thread() - .worker_threads(nthreads) - .build() - .unwrap() + bigbed.chroms().to_vec() }; + let mut writer = io::BufWriter::with_capacity(32 * 1000, out_file); + for chrom in chroms { + let start = start.unwrap_or(0); + let end = end.unwrap_or(chrom.length); + for raw_val in bigbed.get_interval(&chrom.name, start, end)? { + let val = raw_val?; + let end = if !val.rest.is_empty() { + format!("\t{}\n", val.rest) + } else { + "\n".to_string() + }; + let mut buf = String::with_capacity(50); // Estimate + uwrite!( + &mut buf, + "{}\t{}\t{}\t{}\n", + chrom.name, + val.start, + val.end, + end, + ) + .unwrap(); + writer.write(buf.as_bytes())?; + } + } + Ok(()) +} + +pub fn write_bed( + bigbed: BigBedRead, + mut out_file: File, + inmemory: bool, + nthreads: usize, +) -> Result<(), BBIReadError> { + let runtime = runtime::Builder::new_multi_thread() + .worker_threads(nthreads) + .build() + .unwrap(); + + let mut remaining_chroms = bigbed.chroms().to_vec(); + remaining_chroms.reverse(); + + let mut chrom_files: VecDeque<_> = VecDeque::new(); + + async fn file_future( + mut bigbed: BigBedRead, + chrom: ChromInfo, + mut writer: io::BufWriter>, + ) -> Result<(), BBIReadError> { + let mut buf: String = String::with_capacity(50); // Estimate + for raw_val in bigbed.get_interval(&chrom.name, 0, chrom.length)? { + let val = raw_val?; + let end = if !val.rest.is_empty() { + format!("\t{}\n", val.rest) + } else { + "\n".to_string() + }; + uwrite!( + &mut buf, + "{}\t{}\t{}\t{}\n", + chrom.name, + val.start, + val.end, + end, + ) + .unwrap(); + writer.write(buf.as_bytes())?; + buf.clear(); + } + Ok(()) + } + + loop { + while chrom_files.len() < nthreads { + let Some(chrom) = remaining_chroms.pop() else { + break; + }; - let chrom_files: Vec)>> = bigbed - .chroms() - .into_iter() - .cloned() - .filter(|c| chrom.as_ref().map_or(true, |chrom| &c.name == chrom)) - .map(|chrom| { let bigbed = bigbed.reopen()?; let (buf, file): (TempFileBuffer, TempFileBufferWriter) = - TempFileBuffer::new(true); + TempFileBuffer::new(inmemory); let writer = io::BufWriter::new(file); - async fn file_future( - mut bigbed: BigBedRead, - chrom: ChromInfo, - mut writer: io::BufWriter>, - start: Option, - end: Option, - ) -> Result<(), BBIReadError> { - for raw_val in bigbed.get_interval(&chrom.name, 0, chrom.length)? { - let val = raw_val?; - if let Some(start) = start { - if val.start <= start { - continue; - } - } - if let Some(end) = end { - if val.start > end { - continue; - } - } - let end = if !val.rest.is_empty() { - format!("\t{}\n", val.rest) - } else { - "\n".to_string() - }; - writer.write_fmt(format_args!( - "{}\t{}\t{}{}", - chrom.name, val.start, val.end, end - ))?; - } - Ok(()) - } let handle = runtime - .spawn(file_future(bigbed, chrom, writer, start, end)) + .spawn(file_future(bigbed, chrom, writer)) .map(|f| f.unwrap()); - Ok((handle, buf)) - }) - .collect::>(); + chrom_files.push_back((handle, buf)); + } + + let Some((f, mut buf)) = chrom_files.pop_front() else { + break; + }; - for res in chrom_files { - let (f, mut buf) = res.unwrap(); buf.switch(out_file); runtime.block_on(f).unwrap(); out_file = buf.await_real_file(); @@ -172,13 +230,17 @@ pub fn write_bed_from_bed( let start = split.next().expect("Missing start").parse::().unwrap(); let end = split.next().expect("Missing end").parse::().unwrap(); for raw_val in bigbed.get_interval(chrom, start, end)? { - let val = raw_val?; + let mut val = raw_val?; + val.start = val.start.max(start); + val.end = val.end.min(end); let end = if !val.rest.is_empty() { format!("\t{}\n", val.rest) } else { "\n".to_string() }; - writer.write_fmt(format_args!("{}\t{}\t{}{}", chrom, val.start, val.end, end))?; + let mut buf = String::with_capacity(50); // Estimate + uwrite!(&mut buf, "{}\t{}\t{}\t{}\n", chrom, val.start, val.end, end,).unwrap(); + writer.write(buf.as_bytes())?; } } diff --git a/bigtools/src/utils/cli/bigwigtobedgraph.rs b/bigtools/src/utils/cli/bigwigtobedgraph.rs index 743153f..458d2a3 100644 --- a/bigtools/src/utils/cli/bigwigtobedgraph.rs +++ b/bigtools/src/utils/cli/bigwigtobedgraph.rs @@ -1,3 +1,4 @@ +use std::collections::VecDeque; use std::error::Error; use std::fs::File; use std::io::{self, BufReader, Write}; @@ -83,23 +84,12 @@ pub fn bigwigtobedgraph(args: BigWigToBedGraphArgs) -> Result<(), Box write_bg_from_bed(bigwig, bedgraph, overlap_bed)?; } None => { - if nthreads == 1 { + // Right now, we don't offload decompression to separate threads, + // so specifying `chrom` effectively means single-threaded + if nthreads == 1 || args.chrom.is_some() { write_bg_singlethreaded(bigwig, bedgraph, args.chrom, args.start, args.end)?; } else { - let runtime = runtime::Builder::new_multi_thread() - .worker_threads(nthreads) - .build() - .unwrap(); - - runtime.block_on(write_bg( - bigwig, - bedgraph, - args.chrom, - args.start, - args.end, - runtime.handle(), - args.inmemory, - ))?; + write_bg(bigwig, bedgraph, args.inmemory, nthreads)?; } } } @@ -117,16 +107,26 @@ pub fn write_bg_singlethreaded( let start = chrom.as_ref().and_then(|_| start); let end = chrom.as_ref().and_then(|_| end); - let chroms: Vec = bigwig.chroms().to_vec(); + let chroms: Vec = if let Some(arg_chrom) = chrom { + let chrom = bigwig.chroms().iter().find(|c| c.name == arg_chrom); + let Some(chrom) = chrom else { + eprintln!("{arg_chrom} not found in file."); + return Ok(()); + }; + vec![chrom.clone()] + } else { + bigwig.chroms().to_vec() + }; let mut writer = io::BufWriter::with_capacity(32 * 1000, out_file); for chrom in chroms { let start = start.unwrap_or(0); let end = end.unwrap_or(chrom.length); let mut values = bigwig.get_interval(&chrom.name, start, end)?; + let mut buf = String::with_capacity(50); // Estimate while let Some(raw_val) = values.next() { let val = raw_val?; - let mut buf = String::with_capacity(50); // Estimate + // Using ryu for f32 to string conversion has a ~15% speedup uwrite!( &mut buf, "{}\t{}\t{}\t{}\n", @@ -137,74 +137,75 @@ pub fn write_bg_singlethreaded( ) .unwrap(); writer.write(buf.as_bytes())?; + buf.clear(); } } Ok(()) } -pub async fn write_bg( +pub fn write_bg( bigwig: BigWigRead, mut out_file: File, - chrom: Option, - start: Option, - end: Option, - runtime: &runtime::Handle, inmemory: bool, + nthreads: usize, ) -> Result<(), BBIReadError> { - let start = chrom.as_ref().and_then(|_| start); - let end = chrom.as_ref().and_then(|_| end); + let runtime = runtime::Builder::new_multi_thread() + .worker_threads(nthreads) + .build() + .unwrap(); + + let mut remaining_chroms = bigwig.chroms().to_vec(); + remaining_chroms.reverse(); + + let mut chrom_files: VecDeque<_> = VecDeque::new(); + + async fn file_future( + mut bigwig: BigWigRead, + chrom: ChromInfo, + mut writer: io::BufWriter>, + ) -> Result<(), BBIReadError> { + let mut buf: String = String::with_capacity(50); // Estimate + for raw_val in bigwig.get_interval(&chrom.name, 0, chrom.length)? { + let val = raw_val?; + // Using ryu for f32 to string conversion has a ~15% speedup + uwrite!( + &mut buf, + "{}\t{}\t{}\t{}\n", + chrom.name, + val.start, + val.end, + ryu::Buffer::new().format(val.value) + ) + .unwrap(); + writer.write(buf.as_bytes())?; + buf.clear(); + } + Ok(()) + } + + loop { + while chrom_files.len() < nthreads { + let Some(chrom) = remaining_chroms.pop() else { + break; + }; - let chroms: Vec = bigwig.chroms().to_vec(); - let chrom_files: Vec)>> = chroms - .into_iter() - .filter(|c| chrom.as_ref().map_or(true, |chrom| &c.name == chrom)) - .map(|chrom| { - let bigwig = bigwig.reopen()?; + let bigbed = bigwig.reopen()?; let (buf, file): (TempFileBuffer, TempFileBufferWriter) = TempFileBuffer::new(inmemory); let writer = io::BufWriter::new(file); - async fn file_future( - mut bigwig: BigWigRead, - chrom: ChromInfo, - mut writer: io::BufWriter>, - start: u32, - end: u32, - ) -> Result<(), BBIReadError> { - for raw_val in bigwig.get_interval(&chrom.name, start, end)? { - let val = raw_val?; - let mut buf = String::with_capacity(50); // Estimate - - // Using ryu for f32 to string conversion has a ~15% speedup - uwrite!( - &mut buf, - "{}\t{}\t{}\t{}\n", - chrom.name.as_str(), - val.start, - val.end, - ryu::Buffer::new().format(val.value) - ) - .unwrap(); - writer.write(buf.as_bytes())?; - } - Ok(()) - } - let start = start.unwrap_or(0); - let end = end.unwrap_or(chrom.length); let handle = runtime - .spawn(file_future(bigwig, chrom, writer, start, end)) + .spawn(file_future(bigbed, chrom, writer)) .map(|f| f.unwrap()); - Ok((handle, buf)) - }) - .collect::>(); + chrom_files.push_back((handle, buf)); + } + + let Some((f, mut buf)) = chrom_files.pop_front() else { + break; + }; - for res in chrom_files { - let (f, mut buf) = res?; buf.switch(out_file); - f.await?; - while !buf.is_real_file_ready() { - tokio::task::yield_now().await; - } + runtime.block_on(f).unwrap(); out_file = buf.await_real_file(); }