Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor fasta reader index usage and allow filelike object use #51

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 53 additions & 52 deletions oxbow/src/fasta.rs
Original file line number Diff line number Diff line change
@@ -1,60 +1,62 @@
use arrow::array::{ArrayRef, GenericStringBuilder};
use arrow::{error::ArrowError, record_batch::RecordBatch};
use noodles::core::Region;
use arrow::{
array::{ArrayRef, GenericStringBuilder},
error::ArrowError,
record_batch::RecordBatch,
};
use noodles::fasta;
use noodles::fasta::fai;
use std::sync::Arc;
use std::{
io::{self, BufReader},
iter, str,
sync::Arc,
};

use crate::batch_builder::{write_ipc, BatchBuilder};

type BufferedReader = std::io::BufReader<std::fs::File>;

/// A FASTA reader.
pub struct FastaReader {
reader: fasta::IndexedReader<BufferedReader>,
stream_reader: fasta::Reader<Box<dyn std::io::BufRead>>,
pub fn new_from_path(
path: &str,
) -> io::Result<fasta::IndexedReader<Box<dyn fasta::io::BufReadSeek>>> {
// Also reads the index file and handles (b)gzipped files
fasta::indexed_reader::Builder::default().build_from_path(path)
}

impl FastaReader {
/// Creates a Fasta Reader.
pub fn new(path: &str) -> std::io::Result<Self> {
let index = fai::read(format!("{}.fai", path))?;
let file = std::fs::File::open(path)?;
let bufreader = std::io::BufReader::with_capacity(1024 * 1024, file);
let reader = fasta::indexed_reader::Builder::default()
.set_index(index)
.build_from_reader(bufreader)?;
let stream_reader = fasta::reader::Builder::default().build_from_path(path)?;
Ok(Self {
reader,
stream_reader,
})
}

/// Returns the records in the given region as Apache Arrow IPC.
///
/// If the region is `None`, all records are returned.
///
/// # Examples
///
/// ```no_run
/// use oxbow::fasta::FastaReader;
///
/// let mut reader = FastaReader::new("sample.fasta.gz").unwrap();
/// let ipc = reader.records_to_ipc(Some("sq0")).unwrap();
/// ```
pub fn records_to_ipc(&mut self, region: Option<&str>) -> Result<Vec<u8>, ArrowError> {
let batch_builder = FastaBatchBuilder::new(1024)?;

if let Some(region) = region {
let region: Region = region.parse().unwrap();
let query = self.reader.query(&region).unwrap();
let iter = std::iter::once(query);
return write_ipc(iter, batch_builder);
}
pub fn new_from_reader<R>(fasta: R, fai: R) -> io::Result<fasta::IndexedReader<BufReader<R>>>
where
R: io::Read,
{
fasta::indexed_reader::Builder::default()
.set_index(fasta::fai::Reader::new(BufReader::new(fai)).read_index()?)
.build_from_reader(BufReader::new(fasta))
}

let records = self.stream_reader.records().map(|r| r.unwrap());
write_ipc(records, batch_builder)
/// Returns the records in the given region as Apache Arrow IPC.
///
/// If the region is `None`, all records are returned.
///
/// # Examples
///
/// ```no_run
/// use oxbow::fasta;
///
/// let mut reader = fasta::new_from_path("sample.fasta.gz").unwrap();
/// let ipc = fasta::records_to_ipc(reader, Some("sq0")).unwrap();
/// ```
pub fn records_to_ipc<R>(
mut indexed_reader: fasta::IndexedReader<R>,
region: Option<&str>,
) -> Result<Vec<u8>, ArrowError>
where
R: fasta::io::BufReadSeek,
{
let batch_builder = FastaBatchBuilder::new(1024)?;
if let Some(region) = region {
let region = region.parse().unwrap();
let query = indexed_reader.query(&region)?;
let record_iter = iter::once(query);
return write_ipc(record_iter, batch_builder);
} else {
let mut reader = fasta::reader::Builder.build_from_reader(indexed_reader.into_inner())?;
let records = reader.records().map(|r| r.unwrap());
return write_ipc(records, batch_builder);
}
}

Expand All @@ -79,8 +81,7 @@ impl BatchBuilder for FastaBatchBuilder {
let seq = record.sequence().as_ref();

self.name.append_value(record.name());
self.sequence
.append_value(std::str::from_utf8(seq).unwrap());
self.sequence.append_value(str::from_utf8(seq).unwrap());
}

fn finish(mut self) -> Result<RecordBatch, ArrowError> {
Expand Down
32 changes: 27 additions & 5 deletions py-oxbow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use oxbow::bam::BamReader;
use oxbow::bcf;
use oxbow::bigbed::BigBedReader;
use oxbow::bigwig::BigWigReader;
use oxbow::fasta::FastaReader;
use oxbow::fasta;
use oxbow::fastq::FastqReader;
use oxbow::vcf;
// use oxbow::cram::CramReader;
Expand All @@ -32,10 +32,32 @@ fn partition_from_index_file(path: &str, chunksize: u64) -> PyObject {
}

#[pyfunction]
fn read_fasta(path: &str, region: Option<&str>) -> PyObject {
let mut reader = FastaReader::new(path).unwrap();
let ipc = reader.records_to_ipc(region).unwrap();
Python::with_gil(|py| PyBytes::new(py, &ipc).into())
fn read_fasta(
py: Python,
path_or_file_like: PyObject,
region: Option<&str>,
index: Option<PyObject>,
) -> PyObject {
if let Ok(string_ref) = path_or_file_like.downcast::<PyString>(py) {
// If it's a string, treat it as a path
// The underlying builder for the IndexedReader will use both the fasta file and fai index file
let reader = fasta::new_from_path(string_ref.to_str().unwrap()).unwrap();
let ipc = fasta::records_to_ipc(reader, region).unwrap();
Python::with_gil(|py| PyBytes::new(py, &ipc).into())
} else {
// Otherwise, treat it as file-like
let fasta_file_like = match PyFileLikeObject::new(path_or_file_like, true, false, true) {
Ok(file_like) => file_like,
Err(_) => panic!("Unknown argument for `path_url_or_file_like`. Not a file path string or url, and not a file-like object."),
};
let index_file_like = match PyFileLikeObject::new(index.unwrap(), true, false, true) {
Ok(file_like) => file_like,
Err(_) => panic!("Unknown argument for `index`. Not a file path string or url, and not a file-like object."),
};
let reader = fasta::new_from_reader(fasta_file_like, index_file_like).unwrap();
let ipc = fasta::records_to_ipc(reader, region).unwrap();
Python::with_gil(|py| PyBytes::new(py, &ipc).into())
}
}

#[pyfunction]
Expand Down
Loading