Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async alignment reader #288

Merged
merged 14 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions noodles-util/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Changelog

## Unreleased

### Added

* util/alignment: Add async reader (`alignment::r#async::io::Reader`)
([#286]).

[#286]: https://github.com/zaeleus/noodles/issues/286

## 0.50.0 - 2024-08-04

### Added
Expand Down
7 changes: 7 additions & 0 deletions noodles-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ alignment = [
async = [
"dep:futures",
"dep:tokio",
"noodles-bam?/async",
"noodles-bcf?/async",
"noodles-bgzf?/async",
"noodles-cram?/async",
"noodles-sam?/async",
"noodles-vcf?/async",
]
variant = [
Expand Down Expand Up @@ -72,6 +75,10 @@ required-features = ["alignment"]
name = "util_alignment_view"
required-features = ["alignment"]

[[example]]
name = "util_alignment_view_async"
required-features = ["alignment", "async"]

[[example]]
name = "util_variant_query"
required-features = ["variant"]
Expand Down
52 changes: 52 additions & 0 deletions noodles-util/examples/util_alignment_view_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//! Prints an alignment file in the SAM format.
//!
//! Reference sequences in the FASTA format are only required for CRAM inputs that require them.
//!
//! The result matches the output of `samtools view --no-PG --with-header [--reference <fasta-src>]
//! <src>`.

use std::env;

use futures::TryStreamExt;
use noodles_fasta::{self as fasta, repository::adapters::IndexedReader};
use noodles_sam as sam;
use noodles_util::alignment;
use tokio::io::{self, AsyncWriteExt};

#[tokio::main]
async fn main() -> io::Result<()> {
let mut args = env::args().skip(1);

let src = args.next().expect("missing src");
let fasta_src = args.next();

let mut builder = alignment::r#async::io::reader::Builder::default();

if let Some(fasta_src) = fasta_src {
let repository = fasta::io::indexed_reader::Builder::default()
.build_from_path(fasta_src)
.map(IndexedReader::new)
.map(fasta::Repository::new)?;

builder = builder.set_reference_sequence_repository(repository);
}

let mut reader = if src == "-" {
builder.build_from_reader(io::stdin()).await?
} else {
builder.build_from_path(src).await?
};

let header = reader.read_header().await?;

let mut writer = sam::r#async::io::Writer::new(io::stdout());
writer.write_header(&header).await?;

while let Some(record) = reader.records(&header).try_next().await? {
writer.write_alignment_record(&header, &record).await?;
}

writer.get_mut().shutdown().await?;

Ok(())
}
3 changes: 3 additions & 0 deletions noodles-util/src/alignment.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
//! Alignment format utilities.

#[cfg(feature = "async")]
pub mod r#async;

pub mod io;
pub mod iter;
3 changes: 3 additions & 0 deletions noodles-util/src/alignment/async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
//! Async alignment format utilities.

pub mod io;
5 changes: 5 additions & 0 deletions noodles-util/src/alignment/async/io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//! Async alignment format I/O.

pub mod reader;

pub use self::reader::Reader;
100 changes: 100 additions & 0 deletions noodles-util/src/alignment/async/io/reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
//! Async alignment reader.

mod builder;

use std::pin::Pin;

use futures::{Stream, StreamExt};
use noodles_bam as bam;
use noodles_cram as cram;
use noodles_sam as sam;
use tokio::io::{self, AsyncBufRead};

pub use self::builder::Builder;

/// An async alignment reader.
pub enum Reader<R> {
/// SAM.
Sam(sam::r#async::io::Reader<R>),
/// BAM.
Bam(bam::r#async::io::Reader<R>),
/// CRAM.
Cram(cram::r#async::io::Reader<R>),
}

impl<R> Reader<R>
where
R: AsyncBufRead + Unpin,
{
/// Reads the SAM header.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() -> tokio::io::Result<()> {
/// use noodles_util::alignment::r#async::io::reader::Builder;
/// use tokio::io;
/// let mut reader = Builder::default().build_from_reader(io::empty()).await?;
/// let _header = reader.read_header().await?;
/// # Ok(())
/// # }
/// ```
pub async fn read_header(&mut self) -> io::Result<sam::Header> {
match self {
Self::Sam(reader) => reader.read_header().await,
Self::Bam(reader) => reader.read_header().await,
Self::Cram(reader) => reader.read_header().await,
}
}

/// Returns an iterator over records starting from the current stream position.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() -> tokio::io::Result<()> {
/// use futures::TryStreamExt;
/// use noodles_util::alignment::r#async::io::reader::Builder;
/// use tokio::io;
///
/// let mut reader = Builder::default().build_from_reader(io::empty()).await?;
/// let header = reader.read_header().await?;
///
/// let mut records = reader.records(&header);
///
/// while let Some(record) = records.try_next().await? {
/// // ...
/// }
/// # Ok(())
/// # }
/// ```
pub fn records<'r, 'h: 'r>(
&'r mut self,
header: &'h sam::Header,
) -> impl Stream<Item = io::Result<Box<dyn sam::alignment::Record>>> + 'r {
#[allow(clippy::type_complexity)]
let records: Pin<
Box<dyn Stream<Item = io::Result<Box<dyn sam::alignment::Record>>>>,
> = match self {
Self::Sam(reader) => Box::pin(
reader
.records()
.map(|result| result.map(|r| Box::new(r) as Box<dyn sam::alignment::Record>)),
),
Self::Bam(reader) => Box::pin(
reader
.records()
.map(|result| result.map(|r| Box::new(r) as Box<dyn sam::alignment::Record>)),
),
Self::Cram(reader) => Box::pin(
reader
.records(header)
.map(|result| result.map(|r| Box::new(r) as Box<dyn sam::alignment::Record>)),
),
};

records
}
}
175 changes: 175 additions & 0 deletions noodles-util/src/alignment/async/io/reader/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
use std::path::Path;

use noodles_bam as bam;
use noodles_bgzf as bgzf;
use noodles_cram as cram;
use noodles_fasta as fasta;
use noodles_sam as sam;
use tokio::{
fs::File,
io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, BufReader},
};

use super::Reader;
use crate::alignment::io::{CompressionMethod, Format};

/// An async alignment reader builder.
#[derive(Default)]
pub struct Builder {
compression_method: Option<Option<CompressionMethod>>,
format: Option<Format>,
reference_sequence_repository: fasta::Repository,
}

impl Builder {
/// Sets the compression method.
///
/// By default, the compression method is autodetected on build. This can be used to override
/// it.
///
/// # Examples
///
/// ```
/// use noodles_util::alignment::{r#async::io::reader::Builder, io::CompressionMethod};
/// let _builder = Builder::default().set_compression_method(Some(CompressionMethod::Bgzf));
/// ```
pub fn set_compression_method(mut self, compression_method: Option<CompressionMethod>) -> Self {
self.compression_method = Some(compression_method);
self
}

/// Sets the format of the input.
///
/// By default, the format is autodetected on build. This can be used to override it.
///
/// # Examples
///
/// ```
/// use noodles_util::alignment::{r#async::io::reader::Builder, io::Format};
/// let _builder = Builder::default().set_format(Format::Sam);
/// ```
pub fn set_format(mut self, format: Format) -> Self {
self.format = Some(format);
self
}

/// Sets the reference sequence repository.
///
/// # Examples
///
/// ```
/// use noodles_fasta as fasta;
/// use noodles_util::alignment::{r#async::io::reader::Builder, io::Format};
/// let repository = fasta::Repository::default();
/// let _builder = Builder::default().set_reference_sequence_repository(repository);
/// ```
pub fn set_reference_sequence_repository(
mut self,
reference_sequence_repository: fasta::Repository,
) -> Self {
self.reference_sequence_repository = reference_sequence_repository;
self
}

/// Builds an async alignment reader from a path.
///
/// By default, the format and compression method will be autodetected. This can be overridden
/// by using [`Self::set_format`] and [`Self::set_compression_method`].
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> tokio::io::Result<()> {
/// use noodles_util::alignment::r#async::io::reader::Builder;
/// let _reader = Builder::default().build_from_path("sample.bam").await?;
/// # Ok(())
/// # }
/// ```
pub async fn build_from_path<P>(
self,
src: P,
) -> io::Result<Reader<Box<dyn AsyncBufRead + Unpin>>>
where
P: AsRef<Path>,
{
let file = File::open(src).await?;
self.build_from_reader(file).await
}

/// Builds an async alignment reader from a reader.
///
/// By default, the format and compression method will be autodetected. This can be overridden
/// by using [`Self::set_format`] and [`Self::set_compression_method`].
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() -> tokio::io::Result<()> {
/// use noodles_util::alignment::r#async::io::reader::Builder;
/// use tokio::io;
/// let reader = Builder::default().build_from_reader(io::empty()).await?;
/// # Ok(())
/// # }
/// ```
pub async fn build_from_reader<R>(
self,
reader: R,
) -> io::Result<Reader<Box<dyn AsyncBufRead + Unpin>>>
where
R: AsyncRead + Unpin + 'static,
{
use crate::alignment::io::reader::builder::{detect_compression_method, detect_format};

let mut reader = BufReader::new(reader);

let compression_method = match self.compression_method {
Some(compression_method) => compression_method,
None => {
let mut src = reader.fill_buf().await?;
detect_compression_method(&mut src)?
}
};

let format = match self.format {
Some(format) => format,
None => {
let mut src = reader.fill_buf().await?;
detect_format(&mut src, compression_method)?
}
};

let reader: Box<dyn AsyncBufRead + Unpin> = match (format, compression_method) {
(Format::Sam, None) => Box::new(reader),
(Format::Sam, Some(CompressionMethod::Bgzf)) => {
Box::new(bgzf::AsyncReader::new(reader))
}
(Format::Bam, None) => Box::new(reader),
(Format::Bam, Some(CompressionMethod::Bgzf)) => {
Box::new(bgzf::AsyncReader::new(reader))
}
(Format::Cram, None) => {
let inner: Box<dyn AsyncBufRead + Unpin> = Box::new(reader);
let inner = cram::r#async::io::reader::Builder::default()
.set_reference_sequence_repository(self.reference_sequence_repository)
.build_from_reader(inner);
return Ok(Reader::Cram(inner));
}
(Format::Cram, Some(CompressionMethod::Bgzf)) => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"CRAM cannot be compressed with BGZF",
));
}
};

let reader: Reader<Box<dyn AsyncBufRead + Unpin>> = match format {
Format::Sam => Reader::Sam(sam::r#async::io::Reader::new(reader)),
Format::Bam => Reader::Bam(bam::r#async::io::Reader::from(reader)),
Format::Cram => unreachable!(), // Handled above
};

Ok(reader)
}
}
3 changes: 2 additions & 1 deletion noodles-util/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![warn(missing_docs)]

//! **noodles-util** are utilities for working with noodles.
//! **noodles-util** are utilities for working with noodles. Currently, this consists of a unified
//! interface for reading and writing [alignment] (BAM/CRAM/SAM) and [variant] (VCF/BCF) data.

#[cfg(feature = "alignment")]
pub mod alignment;
Expand Down