Skip to content

Commit

Permalink
Redo bigwigtobedgraph and bigbedtobed code to only queue nthreads tas…
Browse files Browse the repository at this point in the history
…ks, and have a more efficient single-threaded version for bigbedtobed. Add inmemory arg to bigbedtobed, default false.
  • Loading branch information
jackh726 committed Apr 27, 2024
1 parent ea88d5f commit 2b0ea0f
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 124 deletions.
176 changes: 119 additions & 57 deletions bigtools/src/utils/cli/bigbedtobed.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::VecDeque;
use std::error::Error;
use std::fs::File;
use std::io::{self, BufReader, Write};
Expand All @@ -6,6 +7,7 @@ use std::path::Path;
use clap::Parser;
use futures::FutureExt;
use tokio::runtime;
use ufmt::uwrite;

use crate::utils::reopen::{Reopen, SeekableRead};
use crate::utils::streaming_linereader::StreamingLineReader;
Expand Down Expand Up @@ -44,6 +46,12 @@ pub struct BigBedToBedArgs {
#[arg(short = 't', long)]
#[arg(default_value_t = 6)]
pub nthreads: usize,

/// Do not create temporary files for intermediate data. (Only applicable when using multiple threads.)
/// By default, approximately one temporary file will be opened for each core.
#[arg(long)]
#[arg(default_value_t = false)]
pub inmemory: bool,
}

pub fn bigbedtobed(args: BigBedToBedArgs) -> Result<(), Box<dyn Error>> {
Expand Down Expand Up @@ -75,80 +83,130 @@ pub fn bigbedtobed(args: BigBedToBedArgs) -> Result<(), Box<dyn Error>> {
write_bed_from_bed(bigbed, bed, overlap_bed)?;
}
None => {
write_bed(bigbed, bed, nthreads, args.chrom, args.start, args.end)?;
// Right now, we don't offload decompression to separate threads,
// so specifying `chrom` effectively means single-threaded
if nthreads == 1 || args.chrom.is_some() {
write_bed_singlethreaded(bigbed, bed, args.chrom, args.start, args.end)?;
} else {
write_bed(bigbed, bed, args.inmemory, nthreads)?;
}
}
}

Ok(())
}

pub fn write_bed<R: Reopen + SeekableRead + Send + 'static>(
bigbed: BigBedRead<R>,
mut out_file: File,
nthreads: usize,
pub fn write_bed_singlethreaded<R: Reopen + SeekableRead>(
mut bigbed: BigBedRead<R>,
out_file: File,
chrom: Option<String>,
start: Option<u32>,
end: Option<u32>,
) -> Result<(), BBIReadError> {
let runtime = if nthreads == 1 {
runtime::Builder::new_current_thread().build().unwrap()
let start = chrom.as_ref().and_then(|_| start);
let end = chrom.as_ref().and_then(|_| end);

let chroms: Vec<ChromInfo> = if let Some(arg_chrom) = chrom {
let chrom = bigbed.chroms().iter().find(|c| c.name == arg_chrom);
let Some(chrom) = chrom else {
eprintln!("{arg_chrom} not found in file.");
return Ok(());
};
vec![chrom.clone()]
} else {
runtime::Builder::new_multi_thread()
.worker_threads(nthreads)
.build()
.unwrap()
bigbed.chroms().to_vec()
};
let mut writer = io::BufWriter::with_capacity(32 * 1000, out_file);
for chrom in chroms {
let start = start.unwrap_or(0);
let end = end.unwrap_or(chrom.length);
for raw_val in bigbed.get_interval(&chrom.name, start, end)? {
let val = raw_val?;
let end = if !val.rest.is_empty() {
format!("\t{}\n", val.rest)
} else {
"\n".to_string()
};
let mut buf = String::with_capacity(50); // Estimate
uwrite!(
&mut buf,
"{}\t{}\t{}\t{}\n",
chrom.name,
val.start,
val.end,
end,
)
.unwrap();
writer.write(buf.as_bytes())?;
}
}
Ok(())
}

pub fn write_bed<R: Reopen + SeekableRead + Send + 'static>(
bigbed: BigBedRead<R>,
mut out_file: File,
inmemory: bool,
nthreads: usize,
) -> Result<(), BBIReadError> {
let runtime = runtime::Builder::new_multi_thread()
.worker_threads(nthreads)
.build()
.unwrap();

let mut remaining_chroms = bigbed.chroms().to_vec();
remaining_chroms.reverse();

let mut chrom_files: VecDeque<_> = VecDeque::new();

async fn file_future<R: SeekableRead + 'static>(
mut bigbed: BigBedRead<R>,
chrom: ChromInfo,
mut writer: io::BufWriter<TempFileBufferWriter<File>>,
) -> Result<(), BBIReadError> {
let mut buf: String = String::with_capacity(50); // Estimate
for raw_val in bigbed.get_interval(&chrom.name, 0, chrom.length)? {
let val = raw_val?;
let end = if !val.rest.is_empty() {
format!("\t{}\n", val.rest)
} else {
"\n".to_string()
};
uwrite!(
&mut buf,
"{}\t{}\t{}\t{}\n",
chrom.name,
val.start,
val.end,
end,
)
.unwrap();
writer.write(buf.as_bytes())?;
buf.clear();
}
Ok(())
}

loop {
while chrom_files.len() < nthreads {
let Some(chrom) = remaining_chroms.pop() else {
break;
};

let chrom_files: Vec<io::Result<(_, TempFileBuffer<File>)>> = bigbed
.chroms()
.into_iter()
.cloned()
.filter(|c| chrom.as_ref().map_or(true, |chrom| &c.name == chrom))
.map(|chrom| {
let bigbed = bigbed.reopen()?;
let (buf, file): (TempFileBuffer<File>, TempFileBufferWriter<File>) =
TempFileBuffer::new(true);
TempFileBuffer::new(inmemory);
let writer = io::BufWriter::new(file);
async fn file_future<R: SeekableRead + 'static>(
mut bigbed: BigBedRead<R>,
chrom: ChromInfo,
mut writer: io::BufWriter<TempFileBufferWriter<File>>,
start: Option<u32>,
end: Option<u32>,
) -> Result<(), BBIReadError> {
for raw_val in bigbed.get_interval(&chrom.name, 0, chrom.length)? {
let val = raw_val?;
if let Some(start) = start {
if val.start <= start {
continue;
}
}
if let Some(end) = end {
if val.start > end {
continue;
}
}
let end = if !val.rest.is_empty() {
format!("\t{}\n", val.rest)
} else {
"\n".to_string()
};
writer.write_fmt(format_args!(
"{}\t{}\t{}{}",
chrom.name, val.start, val.end, end
))?;
}
Ok(())
}
let handle = runtime
.spawn(file_future(bigbed, chrom, writer, start, end))
.spawn(file_future(bigbed, chrom, writer))
.map(|f| f.unwrap());
Ok((handle, buf))
})
.collect::<Vec<_>>();
chrom_files.push_back((handle, buf));
}

let Some((f, mut buf)) = chrom_files.pop_front() else {
break;
};

for res in chrom_files {
let (f, mut buf) = res.unwrap();
buf.switch(out_file);
runtime.block_on(f).unwrap();
out_file = buf.await_real_file();
Expand All @@ -172,13 +230,17 @@ pub fn write_bed_from_bed<R: Reopen + SeekableRead + Send + 'static>(
let start = split.next().expect("Missing start").parse::<u32>().unwrap();
let end = split.next().expect("Missing end").parse::<u32>().unwrap();
for raw_val in bigbed.get_interval(chrom, start, end)? {
let val = raw_val?;
let mut val = raw_val?;
val.start = val.start.max(start);
val.end = val.end.min(end);
let end = if !val.rest.is_empty() {
format!("\t{}\n", val.rest)
} else {
"\n".to_string()
};
writer.write_fmt(format_args!("{}\t{}\t{}{}", chrom, val.start, val.end, end))?;
let mut buf = String::with_capacity(50); // Estimate
uwrite!(&mut buf, "{}\t{}\t{}\t{}\n", chrom, val.start, val.end, end,).unwrap();
writer.write(buf.as_bytes())?;
}
}

Expand Down
Loading

0 comments on commit 2b0ea0f

Please sign in to comment.