Skip to content

Commit

Permalink
util/alignment: Add async writer (#292)
Browse files Browse the repository at this point in the history
* util/alignment/async: add async alignment writer

* util/alignment/async: fmt

* util/alignment/async: explicitly convert to cram::Record

* util/alignment/async: add Writer::finish

* util/alignment/async: add Writer example

* util/alignment/async: add missing async declaration

* util/alignment/async: change finish to shutdown

* util/alignment/async: tidy up imports
  • Loading branch information
mbhall88 authored Aug 26, 2024
1 parent 802ed03 commit 16f9a13
Show file tree
Hide file tree
Showing 5 changed files with 355 additions and 3 deletions.
48 changes: 48 additions & 0 deletions noodles-util/examples/util_alignment_rewrite_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//! Rewrites an alignment format to another alignment format asynchronously.
//!
//! The output format is determined from the extension of the destination.
use std::env;

use futures::TryStreamExt;
use noodles_fasta::{self as fasta, repository::adapters::IndexedReader};
use noodles_util::alignment;
use tokio::io;

#[tokio::main]
async fn main() -> io::Result<()> {
let mut args = env::args().skip(1);

let src = args.next().expect("missing src");
let dst = args.next().expect("missing dst");
let fasta_src = args.next();

let repository = fasta_src
.map(|src| fasta::io::indexed_reader::Builder::default().build_from_path(src))
.transpose()?
.map(IndexedReader::new)
.map(fasta::Repository::new)
.unwrap_or_default();

let mut reader = alignment::r#async::io::reader::Builder::default()
.set_reference_sequence_repository(repository.clone())
.build_from_path(src)
.await?;

let header = reader.read_header().await?;

let mut writer = alignment::r#async::io::writer::Builder::default()
.set_reference_sequence_repository(repository)
.build_from_path(dst)
.await?;

writer.write_header(&header).await?;

while let Some(record) = reader.records(&header).try_next().await? {
writer.write_record(&header, &record).await?;
}

writer.shutdown(&header).await?;

Ok(())
}
3 changes: 2 additions & 1 deletion noodles-util/src/alignment/async/io.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Async alignment format I/O.
pub mod reader;
pub mod writer;

pub use self::reader::Reader;
pub use self::{reader::Reader, writer::Writer};
116 changes: 116 additions & 0 deletions noodles-util/src/alignment/async/io/writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
//! Async alignment writer.
mod builder;

use noodles_bam as bam;
use noodles_cram as cram;
use noodles_sam as sam;
use tokio::io::{self, AsyncWrite};

pub use self::builder::Builder;

/// An async alignment writer.
pub enum Writer<W: AsyncWrite> {
/// SAM.
Sam(sam::r#async::io::Writer<W>),
/// BAM.
Bam(bam::r#async::io::Writer<W>),
/// CRAM.
Cram(cram::r#async::io::Writer<W>),
}

impl<W> Writer<W>
where
W: AsyncWrite + Unpin,
{
/// Writes a SAM header.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() -> tokio::io::Result<()> {
/// use noodles_sam as sam;
/// use noodles_util::alignment::r#async::io::writer::Builder;
/// use tokio::io::{self, AsyncWriteExt};
///
/// let mut writer = Builder::default().build_from_writer(io::sink()).await?;
///
/// let header = sam::Header::default();
/// writer.write_header(&header).await?;
/// # Ok(())
/// # }
/// ```
pub async fn write_header(&mut self, header: &sam::Header) -> io::Result<()> {
match self {
Self::Sam(writer) => writer.write_header(header).await,
Self::Bam(writer) => writer.write_header(header).await,
Self::Cram(writer) => {
writer.write_file_definition().await?;
writer.write_file_header(header).await
}
}
}

/// Writes an alignment record.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() -> tokio::io::Result<()> {
/// use noodles_sam as sam;
/// use noodles_util::alignment::r#async::io::writer::Builder;
/// use tokio::io::{self, AsyncWriteExt};
///
/// let mut writer = Builder::default().build_from_writer(io::sink()).await?;
///
/// let header = sam::Header::default();
/// let record = sam::Record::default();
/// writer.write_record(&header, &record).await?;
/// # Ok(())
/// # }
/// ```
pub async fn write_record(
&mut self,
header: &sam::Header,
record: &dyn sam::alignment::Record,
) -> io::Result<()> {
match self {
Self::Sam(writer) => writer.write_alignment_record(header, record).await,
Self::Bam(writer) => writer.write_alignment_record(header, record).await,
Self::Cram(writer) => {
let record = cram::Record::try_from_alignment_record(header, record)?;
writer.write_record(header, record).await
}
}
}

/// Shuts down the alignment format writer.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() -> tokio::io::Result<()> {
/// use noodles_sam as sam;
/// use noodles_util::alignment::{self, io::Format};
/// use tokio::io;
///
/// let mut writer = alignment::r#async::io::writer::Builder::default()
/// .set_format(Format::Sam)
/// .build_from_writer(io::sink()).await?;
///
/// let header = sam::Header::default();
/// writer.shutdown(&header).await?;
/// # Ok(())
/// # }
/// ```
pub async fn shutdown(&mut self, header: &sam::Header) -> io::Result<()> {
match self {
Self::Sam(_) => Ok(()),
Self::Bam(writer) => writer.shutdown().await,
Self::Cram(writer) => writer.shutdown(header).await,
}
}
}
187 changes: 187 additions & 0 deletions noodles-util/src/alignment/async/io/writer/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
use std::path::Path;

use noodles_bam as bam;
use noodles_bgzf as bgzf;
use noodles_cram as cram;
use noodles_fasta as fasta;
use noodles_sam as sam;
use tokio::{
fs::File,
io::{self, AsyncWrite},
};

use super::Writer;
use crate::alignment::io::{CompressionMethod, Format};

/// An async alignment writer builder.
#[derive(Default)]
pub struct Builder {
compression_method: Option<Option<CompressionMethod>>,
format: Option<Format>,
reference_sequence_repository: fasta::Repository,
}

impl Builder {
/// Sets the compression method.
///
/// By default, the compression method is autodetected on build. This can be used to override
/// it.
///
/// # Examples
///
/// ```
/// use noodles_util::alignment::{r#async::io::writer::Builder, io::CompressionMethod};
/// let builder = Builder::default().set_compression_method(Some(CompressionMethod::Bgzf));
/// ```
pub fn set_compression_method(mut self, compression_method: Option<CompressionMethod>) -> Self {
self.compression_method = Some(compression_method);
self
}

/// Sets the format of the output.
///
/// By default, the format is autodetected on build. This can be used to override it.
///
/// # Examples
///
/// ```
/// use noodles_util::alignment::{r#async::io::writer::Builder, io::Format};
/// let builder = Builder::default().set_format(Format::Sam);
/// ```
pub fn set_format(mut self, format: Format) -> Self {
self.format = Some(format);
self
}

/// Sets the reference sequence repository.
///
/// # Examples
///
/// ```
/// use noodles_fasta as fasta;
/// use noodles_util::alignment::r#async::io::writer::Builder;
/// let repository = fasta::Repository::default();
/// let builder = Builder::default().set_reference_sequence_repository(repository);
/// ```
pub fn set_reference_sequence_repository(
mut self,
reference_sequence_repository: fasta::Repository,
) -> Self {
self.reference_sequence_repository = reference_sequence_repository;
self
}

/// Builds an async alignment writer from a path.
///
/// By default, the format and compression method will be detected from the path extension.
/// This can be overridden by using [`Self::set_format`] and [`Self::set_compression_method`].
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> tokio::io::Result<()> {
/// use noodles_util::alignment::r#async::io::reader::Builder;
/// let reader = Builder::default().build_from_path("sample.bam").await?;
/// # Ok(())
/// # }
/// ```
pub async fn build_from_path<P>(
mut self,
src: P,
) -> io::Result<Writer<Box<dyn AsyncWrite + Unpin>>>
where
P: AsRef<Path>,
{
use crate::alignment::io::writer::builder::{
detect_compression_method_from_path_extension, detect_format_from_path_extension,
};

let src = src.as_ref();

if self.compression_method.is_none() {
self.compression_method = Some(detect_compression_method_from_path_extension(src));
}

if self.format.is_none() {
self.format = detect_format_from_path_extension(src);
}

File::create(src)
.await
.map(|file| self.build_from_writer(file))?
.await
}

/// Builds an async alignment writer from a writer.
///
/// If the format is not set, a default format is used. If the compression method is not set, a
/// default one is determined by the format.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() -> tokio::io::Result<()> {
/// use noodles_util::alignment::r#async::io::writer::Builder;
/// use tokio::io;
///
/// let reader = Builder::default().build_from_writer(io::sink()).await?;
/// # Ok(())
/// # }
/// ```
pub async fn build_from_writer<W>(
self,
writer: W,
) -> io::Result<Writer<Box<dyn AsyncWrite + Unpin>>>
where
W: AsyncWrite + Unpin + 'static,
{
use bam::r#async::io::Writer as AsyncBamWriter;
use sam::r#async::io::Writer as AsyncSamWriter;

let format = self.format.unwrap_or(Format::Sam);

let compression_method = match self.compression_method {
Some(compression_method) => compression_method,
None => match format {
Format::Sam | Format::Cram => None,
Format::Bam => Some(CompressionMethod::Bgzf),
},
};

let writer = match (format, compression_method) {
(Format::Sam, None) => {
let inner: Box<dyn AsyncWrite + Unpin> = Box::new(writer);
Writer::Sam(AsyncSamWriter::new(inner))
}
(Format::Sam, Some(CompressionMethod::Bgzf)) => {
let encoder: Box<dyn AsyncWrite + Unpin> = Box::new(bgzf::AsyncWriter::new(writer));
Writer::Sam(AsyncSamWriter::new(encoder))
}
(Format::Bam, None) => {
let inner: Box<dyn AsyncWrite + Unpin> = Box::new(writer);
Writer::Bam(AsyncBamWriter::from(inner))
}
(Format::Bam, Some(CompressionMethod::Bgzf)) => {
let encoder: Box<dyn AsyncWrite + Unpin> = Box::new(bgzf::AsyncWriter::new(writer));
Writer::Bam(AsyncBamWriter::from(encoder))
}
(Format::Cram, None) => {
let inner: Box<dyn AsyncWrite + Unpin> = Box::new(writer);
let inner = cram::r#async::io::writer::Builder::default()
.set_reference_sequence_repository(self.reference_sequence_repository)
.build_with_writer(inner);
Writer::Cram(inner)
}
(Format::Cram, Some(CompressionMethod::Bgzf)) => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"CRAM cannot be compressed with BGZF",
));
}
};

Ok(writer)
}
}
4 changes: 2 additions & 2 deletions noodles-util/src/alignment/io/writer/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl Builder {
}
}

fn detect_compression_method_from_path_extension<P>(path: P) -> Option<CompressionMethod>
pub(crate) fn detect_compression_method_from_path_extension<P>(path: P) -> Option<CompressionMethod>
where
P: AsRef<Path>,
{
Expand All @@ -196,7 +196,7 @@ where
}
}

fn detect_format_from_path_extension<P>(path: P) -> Option<Format>
pub(crate) fn detect_format_from_path_extension<P>(path: P) -> Option<Format>
where
P: AsRef<Path>,
{
Expand Down

0 comments on commit 16f9a13

Please sign in to comment.