Skip to content

Commit

Permalink
util/alignment: Add async reader (#288)
Browse files Browse the repository at this point in the history
* [WIP] add skeleton code for async reader

* util/alignment/async: add Reader

* util/alignment/async: add Reader Builder

* util/alignment/async: fix Reader Builder

* util/alignment/async: add doc examples

* util/alignment/async: add async view example

* util/alignment: Organize imports

* util/alignment/async/io: Update descriptions

* util/alignment/async/io/reader: Simplify examples

* util/examples/alignment_view_async: Normalize example

This is to better match `util_variant_view_async`.

* util: Add required features for util_alignment_view_async example

* util: Sort async dependencies

* util/alignment/async/io/reader/builder: Import Path

* util/changelog: Add entry for async reader

---------

Co-authored-by: Michael Macias <[email protected]>
  • Loading branch information
mbhall88 and zaeleus authored Aug 15, 2024
1 parent bb2938f commit bba0ee5
Show file tree
Hide file tree
Showing 9 changed files with 356 additions and 1 deletion.
9 changes: 9 additions & 0 deletions noodles-util/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Changelog

## Unreleased

### Added

* util/alignment: Add async reader (`alignment::r#async::io::Reader`)
([#286]).

[#286]: https://github.com/zaeleus/noodles/issues/286

## 0.50.0 - 2024-08-04

### Added
Expand Down
7 changes: 7 additions & 0 deletions noodles-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ alignment = [
async = [
"dep:futures",
"dep:tokio",
"noodles-bam?/async",
"noodles-bcf?/async",
"noodles-bgzf?/async",
"noodles-cram?/async",
"noodles-sam?/async",
"noodles-vcf?/async",
]
variant = [
Expand Down Expand Up @@ -72,6 +75,10 @@ required-features = ["alignment"]
name = "util_alignment_view"
required-features = ["alignment"]

[[example]]
name = "util_alignment_view_async"
required-features = ["alignment", "async"]

[[example]]
name = "util_variant_query"
required-features = ["variant"]
Expand Down
52 changes: 52 additions & 0 deletions noodles-util/examples/util_alignment_view_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//! Prints an alignment file in the SAM format.
//!
//! Reference sequences in the FASTA format are only required for CRAM inputs that require them.
//!
//! The result matches the output of `samtools view --no-PG --with-header [--reference <fasta-src>]
//! <src>`.
use std::env;

use futures::TryStreamExt;
use noodles_fasta::{self as fasta, repository::adapters::IndexedReader};
use noodles_sam as sam;
use noodles_util::alignment;
use tokio::io::{self, AsyncWriteExt};

#[tokio::main]
async fn main() -> io::Result<()> {
let mut args = env::args().skip(1);

let src = args.next().expect("missing src");
let fasta_src = args.next();

let mut builder = alignment::r#async::io::reader::Builder::default();

if let Some(fasta_src) = fasta_src {
let repository = fasta::io::indexed_reader::Builder::default()
.build_from_path(fasta_src)
.map(IndexedReader::new)
.map(fasta::Repository::new)?;

builder = builder.set_reference_sequence_repository(repository);
}

let mut reader = if src == "-" {
builder.build_from_reader(io::stdin()).await?
} else {
builder.build_from_path(src).await?
};

let header = reader.read_header().await?;

let mut writer = sam::r#async::io::Writer::new(io::stdout());
writer.write_header(&header).await?;

while let Some(record) = reader.records(&header).try_next().await? {
writer.write_alignment_record(&header, &record).await?;
}

writer.get_mut().shutdown().await?;

Ok(())
}
3 changes: 3 additions & 0 deletions noodles-util/src/alignment.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
//! Alignment format utilities.
#[cfg(feature = "async")]
pub mod r#async;

pub mod io;
pub mod iter;
3 changes: 3 additions & 0 deletions noodles-util/src/alignment/async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
//! Async alignment format utilities.
pub mod io;
5 changes: 5 additions & 0 deletions noodles-util/src/alignment/async/io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//! Async alignment format I/O.
pub mod reader;

pub use self::reader::Reader;
100 changes: 100 additions & 0 deletions noodles-util/src/alignment/async/io/reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
//! Async alignment reader.
mod builder;

use std::pin::Pin;

use futures::{Stream, StreamExt};
use noodles_bam as bam;
use noodles_cram as cram;
use noodles_sam as sam;
use tokio::io::{self, AsyncBufRead};

pub use self::builder::Builder;

/// An async alignment reader.
pub enum Reader<R> {
/// SAM.
Sam(sam::r#async::io::Reader<R>),
/// BAM.
Bam(bam::r#async::io::Reader<R>),
/// CRAM.
Cram(cram::r#async::io::Reader<R>),
}

impl<R> Reader<R>
where
R: AsyncBufRead + Unpin,
{
/// Reads the SAM header.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() -> tokio::io::Result<()> {
/// use noodles_util::alignment::r#async::io::reader::Builder;
/// use tokio::io;
/// let mut reader = Builder::default().build_from_reader(io::empty()).await?;
/// let _header = reader.read_header().await?;
/// # Ok(())
/// # }
/// ```
pub async fn read_header(&mut self) -> io::Result<sam::Header> {
match self {
Self::Sam(reader) => reader.read_header().await,
Self::Bam(reader) => reader.read_header().await,
Self::Cram(reader) => reader.read_header().await,
}
}

/// Returns an iterator over records starting from the current stream position.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() -> tokio::io::Result<()> {
/// use futures::TryStreamExt;
/// use noodles_util::alignment::r#async::io::reader::Builder;
/// use tokio::io;
///
/// let mut reader = Builder::default().build_from_reader(io::empty()).await?;
/// let header = reader.read_header().await?;
///
/// let mut records = reader.records(&header);
///
/// while let Some(record) = records.try_next().await? {
/// // ...
/// }
/// # Ok(())
/// # }
/// ```
pub fn records<'r, 'h: 'r>(
&'r mut self,
header: &'h sam::Header,
) -> impl Stream<Item = io::Result<Box<dyn sam::alignment::Record>>> + 'r {
#[allow(clippy::type_complexity)]
let records: Pin<
Box<dyn Stream<Item = io::Result<Box<dyn sam::alignment::Record>>>>,
> = match self {
Self::Sam(reader) => Box::pin(
reader
.records()
.map(|result| result.map(|r| Box::new(r) as Box<dyn sam::alignment::Record>)),
),
Self::Bam(reader) => Box::pin(
reader
.records()
.map(|result| result.map(|r| Box::new(r) as Box<dyn sam::alignment::Record>)),
),
Self::Cram(reader) => Box::pin(
reader
.records(header)
.map(|result| result.map(|r| Box::new(r) as Box<dyn sam::alignment::Record>)),
),
};

records
}
}
175 changes: 175 additions & 0 deletions noodles-util/src/alignment/async/io/reader/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
use std::path::Path;

use noodles_bam as bam;
use noodles_bgzf as bgzf;
use noodles_cram as cram;
use noodles_fasta as fasta;
use noodles_sam as sam;
use tokio::{
fs::File,
io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, BufReader},
};

use super::Reader;
use crate::alignment::io::{CompressionMethod, Format};

/// An async alignment reader builder.
#[derive(Default)]
pub struct Builder {
compression_method: Option<Option<CompressionMethod>>,
format: Option<Format>,
reference_sequence_repository: fasta::Repository,
}

impl Builder {
/// Sets the compression method.
///
/// By default, the compression method is autodetected on build. This can be used to override
/// it.
///
/// # Examples
///
/// ```
/// use noodles_util::alignment::{r#async::io::reader::Builder, io::CompressionMethod};
/// let _builder = Builder::default().set_compression_method(Some(CompressionMethod::Bgzf));
/// ```
pub fn set_compression_method(mut self, compression_method: Option<CompressionMethod>) -> Self {
self.compression_method = Some(compression_method);
self
}

/// Sets the format of the input.
///
/// By default, the format is autodetected on build. This can be used to override it.
///
/// # Examples
///
/// ```
/// use noodles_util::alignment::{r#async::io::reader::Builder, io::Format};
/// let _builder = Builder::default().set_format(Format::Sam);
/// ```
pub fn set_format(mut self, format: Format) -> Self {
self.format = Some(format);
self
}

/// Sets the reference sequence repository.
///
/// # Examples
///
/// ```
/// use noodles_fasta as fasta;
/// use noodles_util::alignment::{r#async::io::reader::Builder, io::Format};
/// let repository = fasta::Repository::default();
/// let _builder = Builder::default().set_reference_sequence_repository(repository);
/// ```
pub fn set_reference_sequence_repository(
mut self,
reference_sequence_repository: fasta::Repository,
) -> Self {
self.reference_sequence_repository = reference_sequence_repository;
self
}

/// Builds an async alignment reader from a path.
///
/// By default, the format and compression method will be autodetected. This can be overridden
/// by using [`Self::set_format`] and [`Self::set_compression_method`].
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> tokio::io::Result<()> {
/// use noodles_util::alignment::r#async::io::reader::Builder;
/// let _reader = Builder::default().build_from_path("sample.bam").await?;
/// # Ok(())
/// # }
/// ```
pub async fn build_from_path<P>(
self,
src: P,
) -> io::Result<Reader<Box<dyn AsyncBufRead + Unpin>>>
where
P: AsRef<Path>,
{
let file = File::open(src).await?;
self.build_from_reader(file).await
}

/// Builds an async alignment reader from a reader.
///
/// By default, the format and compression method will be autodetected. This can be overridden
/// by using [`Self::set_format`] and [`Self::set_compression_method`].
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() -> tokio::io::Result<()> {
/// use noodles_util::alignment::r#async::io::reader::Builder;
/// use tokio::io;
/// let reader = Builder::default().build_from_reader(io::empty()).await?;
/// # Ok(())
/// # }
/// ```
pub async fn build_from_reader<R>(
self,
reader: R,
) -> io::Result<Reader<Box<dyn AsyncBufRead + Unpin>>>
where
R: AsyncRead + Unpin + 'static,
{
use crate::alignment::io::reader::builder::{detect_compression_method, detect_format};

let mut reader = BufReader::new(reader);

let compression_method = match self.compression_method {
Some(compression_method) => compression_method,
None => {
let mut src = reader.fill_buf().await?;
detect_compression_method(&mut src)?
}
};

let format = match self.format {
Some(format) => format,
None => {
let mut src = reader.fill_buf().await?;
detect_format(&mut src, compression_method)?
}
};

let reader: Box<dyn AsyncBufRead + Unpin> = match (format, compression_method) {
(Format::Sam, None) => Box::new(reader),
(Format::Sam, Some(CompressionMethod::Bgzf)) => {
Box::new(bgzf::AsyncReader::new(reader))
}
(Format::Bam, None) => Box::new(reader),
(Format::Bam, Some(CompressionMethod::Bgzf)) => {
Box::new(bgzf::AsyncReader::new(reader))
}
(Format::Cram, None) => {
let inner: Box<dyn AsyncBufRead + Unpin> = Box::new(reader);
let inner = cram::r#async::io::reader::Builder::default()
.set_reference_sequence_repository(self.reference_sequence_repository)
.build_from_reader(inner);
return Ok(Reader::Cram(inner));
}
(Format::Cram, Some(CompressionMethod::Bgzf)) => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"CRAM cannot be compressed with BGZF",
));
}
};

let reader: Reader<Box<dyn AsyncBufRead + Unpin>> = match format {
Format::Sam => Reader::Sam(sam::r#async::io::Reader::new(reader)),
Format::Bam => Reader::Bam(bam::r#async::io::Reader::from(reader)),
Format::Cram => unreachable!(), // Handled above
};

Ok(reader)
}
}
3 changes: 2 additions & 1 deletion noodles-util/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![warn(missing_docs)]

//! **noodles-util** are utilities for working with noodles.
//! **noodles-util** are utilities for working with noodles. Currently, this consists of a unified
//! interface for reading and writing [alignment] (BAM/CRAM/SAM) and [variant] (VCF/BCF) data.
#[cfg(feature = "alignment")]
pub mod alignment;
Expand Down

0 comments on commit bba0ee5

Please sign in to comment.