Skip to content

Commit

Permalink
build: update exon to v16 (#118)
Browse files Browse the repository at this point in the history
* build: update exon
* fix: fix tests
* build: upgrading exon
  • Loading branch information
tshauck authored Apr 17, 2024
1 parent 9887f5c commit ceafa47
Show file tree
Hide file tree
Showing 14 changed files with 166 additions and 83 deletions.
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ crate-type = ["cdylib"]
name = "biobear"

[dependencies]
arrow = { version = "50.0.0", features = ["pyarrow"] }
datafusion = "36"
exon = { version = "0.15.0", features = ["all"] }
arrow = { version = "51.0.0", features = ["pyarrow"] }
datafusion = "37"
exon = { version = "0.16.4", features = ["default"] }
pyo3 = "0.20"
tokio = { version = "1", features = ["rt"] }
noodles = "0.68"

[profile.release]
codegen-units = 1
Expand Down
10 changes: 9 additions & 1 deletion python/biobear/bcf_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@

import pyarrow as pa


from biobear.reader import Reader

from .biobear import _ExonReader, _BCFIndexedReader

from warnings import warn


class BCFReader(Reader):
"""A BCF File Reader.
Expand All @@ -38,6 +39,13 @@ def __init__(self, path: os.PathLike):
path (Path): Path to the BCF file.
"""

# show a warning that this is deprecated
warn(
"BCFReader is deprecated, use ExonSessionContext instead",
DeprecationWarning,
)

self._bcf_reader = _ExonReader(str(path), "BCF", None)

@property
Expand Down
63 changes: 0 additions & 63 deletions python/tests/test_bcf_reader.py

This file was deleted.

6 changes: 5 additions & 1 deletion python/tests/test_fastq_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,19 @@ def test_fastq_reader_to_pandas():

assert len(df) == 2


@pytest.mark.skipif(
not importlib.util.find_spec("polars"), reason="polars not installed"
)
def test_fastq_bgzip_reader():
fastq_reader = FastqReader(DATA / "fake_fastq_file.fastq.gz", compression=Compression.GZIP)
fastq_reader = FastqReader(
DATA / "fake_fastq_file.fastq.gz", compression=Compression.GZIP
)
df = fastq_reader.to_polars()

assert len(df) == 20_000


@pytest.mark.skipif(
not importlib.util.find_spec("polars"), reason="polars not installed"
)
Expand Down
42 changes: 36 additions & 6 deletions python/tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@

import pytest

from biobear import connect, FASTQReadOptions, FASTAReadOptions, FileCompressionType
from biobear.biobear import (
connect,
FASTQReadOptions,
FASTAReadOptions,
FileCompressionType,
BCFReadOptions,
)

DATA = Path(__file__).parent / "data"

Expand Down Expand Up @@ -160,7 +166,7 @@ def test_read_fasta_gz():
fasta_path = DATA / "test.fa.gz"

options = FASTAReadOptions(
file_extension="fa.gz", file_compression_type=FileCompressionType.GZIP
file_extension="fa", file_compression_type=FileCompressionType.GZIP
)
df = session.read_fasta_file(str(fasta_path), options=options).to_polars()

Expand Down Expand Up @@ -239,9 +245,7 @@ def test_execute(tmp_path):
query = f"CREATE EXTERNAL TABLE gff_file STORED AS GFF LOCATION '{gff_path}'"
session.execute(query)

copy_query = (
f"COPY (SELECT seqname FROM gff_file) TO '{output_path}' (FORMAT PARQUET)"
)
copy_query = f"COPY (SELECT seqname FROM gff_file) TO '{output_path}'"
session.execute(copy_query)

assert output_path.exists()
Expand Down Expand Up @@ -283,9 +287,35 @@ def test_copy_to_s3():
s3_input_path = "s3://test-bucket/test.fasta"
parquet_output = "s3://parquet-bucket/test.parquet"

query = f"COPY (SELECT * FROM fasta_scan('{s3_input_path}')) TO '{parquet_output}' (FORMAT PARQUET)"
query = f"COPY (SELECT * FROM fasta_scan('{s3_input_path}')) TO '{parquet_output}'"

session.register_object_store_from_url(parquet_output)

# Should not raise an exception
session.execute(query)


def test_read_bcf_file():
"""Test reading a BCF file."""
session = connect()

bcf_path = DATA / "index.bcf"

arrow_table = session.read_bcf_file(bcf_path.as_posix()).to_arrow()

assert len(arrow_table) == 621


@pytest.mark.skip("Not implemented yet")
def test_bcf_indexed_reader_query():
"""Test the BCFIndexedReader.query() method."""
session = connect()
options = BCFReadOptions(region="1")

bcf_path = DATA / "index.bcf"

rbr = session.read_bcf_file(
bcf_path.as_posix(), options=options
).to_arrow_record_batch_reader()

assert 191 == sum(b.num_rows for b in rbr)
11 changes: 10 additions & 1 deletion src/bcf_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
use arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream};
use arrow::pyarrow::IntoPyArrow;
use datafusion::prelude::{SessionConfig, SessionContext};
use exon::datasources::bcf::table_provider::ListingBCFTableOptions;
use exon::ffi::DataFrameRecordBatchStream;
use noodles::core::Region;
use pyo3::prelude::*;
use tokio::runtime::Runtime;

use exon::ExonSessionExt;

use std::io;
use std::str::FromStr;
use std::sync::Arc;

use crate::error::BioBearError;
Expand Down Expand Up @@ -68,8 +71,14 @@ impl BCFIndexedReader {

let ctx = SessionContext::new_with_config(config);

let region = Region::from_str(region).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("Error parsing region: {e}"))
})?;

let options = ListingBCFTableOptions::default().with_regions(vec![region]);

let df = self._runtime.block_on(async {
match ctx.query_bcf_file(self.path.as_str(), region).await {
match ctx.read_bcf(self.path.as_str(), options).await {
Ok(df) => Ok(df),
Err(e) => Err(io::Error::new(
io::ErrorKind::Other,
Expand Down
55 changes: 55 additions & 0 deletions src/datasources/bcf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2024 WHERE TRUE Technologies.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{str::FromStr, vec};

use exon::datasources::bcf::table_provider::ListingBCFTableOptions;
use noodles::core::Region;
use pyo3::{pyclass, pymethods};

use crate::error::{BioBearError, BioBearResult};

#[pyclass]
#[derive(Default, Debug, Clone)]
pub struct BCFReadOptions {
region: Option<Region>,
}

#[pymethods]
impl BCFReadOptions {
#[new]
#[pyo3(signature = (/, region))]
fn try_new(region: Option<String>) -> BioBearResult<Self> {
let region = region
.map(|r| Region::from_str(&r))
.transpose()
.map_err(|e| {
BioBearError::ParserError(format!("Couldn\'t parse region error {}", e))
})?;

Ok(Self { region })
}
}

impl From<BCFReadOptions> for ListingBCFTableOptions {
fn from(options: BCFReadOptions) -> Self {
let region = if let Some(r) = options.region {
vec![r]
} else {
vec![]
};

ListingBCFTableOptions::default().with_regions(region)
}
}
2 changes: 1 addition & 1 deletion src/datasources/fasta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,6 @@ impl FASTAReadOptions {
impl From<FASTAReadOptions> for ListingFASTATableOptions {
fn from(options: FASTAReadOptions) -> Self {
ListingFASTATableOptions::new(options.file_compression_type)
.with_file_extension(options.file_extension)
.with_some_file_extension(Some(&options.file_extension))
}
}
2 changes: 1 addition & 1 deletion src/datasources/fastq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,6 @@ impl FASTQReadOptions {
impl From<FASTQReadOptions> for ListingFASTQTableOptions {
fn from(options: FASTQReadOptions) -> Self {
ListingFASTQTableOptions::new(options.file_compression_type)
.with_file_extension(options.file_extension)
.with_some_file_extension(Some(&options.file_extension))
}
}
1 change: 1 addition & 0 deletions src/datasources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod bcf;
pub mod fasta;
pub mod fastq;
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use arrow::error::ArrowError;
use datafusion::{error::DataFusionError, sql::sqlparser::parser::ParserError};
use exon::error::ExonError;
use exon::ExonError;
use pyo3::PyErr;

#[derive(Debug)]
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ fn biobear(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<file_compression_type::FileCompressionType>()?;
m.add_class::<datasources::fastq::FASTQReadOptions>()?;
m.add_class::<datasources::fasta::FASTAReadOptions>()?;
m.add_class::<datasources::bcf::BCFReadOptions>()?;

m.add_function(wrap_pyfunction!(session_context::connect, m)?)?;
m.add_function(wrap_pyfunction!(session_context::new_context, m)?)?;

Ok(())
}
30 changes: 26 additions & 4 deletions src/session_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use exon::{ExonRuntimeEnvExt, ExonSessionExt};

use pyo3::prelude::*;

use crate::datasources::bcf::BCFReadOptions;
use crate::datasources::fasta::FASTAReadOptions;
use crate::datasources::fastq::FASTQReadOptions;
use crate::error;
Expand Down Expand Up @@ -51,9 +52,25 @@ impl ExonSessionContext {
options: Option<FASTQReadOptions>,
py: Python,
) -> PyResult<PyExecutionResult> {
let options = options.map(|o| o.into());
let options = options.unwrap_or_default();

let result = self.ctx.read_fastq(file_path, options);
let result = self.ctx.read_fastq(file_path, options.into());
let df = wait_for_future(py, result).map_err(error::BioBearError::from)?;

Ok(PyExecutionResult::new(df))
}

/// Read a BCF file from the given path.
#[pyo3(signature = (file_path, *, options=None))]
fn read_bcf_file(
&mut self,
file_path: &str,
options: Option<BCFReadOptions>,
py: Python,
) -> PyResult<PyExecutionResult> {
let options = options.unwrap_or_default();

let result = self.ctx.read_bcf(file_path, options.into());
let df = wait_for_future(py, result).map_err(error::BioBearError::from)?;

Ok(PyExecutionResult::new(df))
Expand All @@ -67,9 +84,9 @@ impl ExonSessionContext {
options: Option<FASTAReadOptions>,
py: Python,
) -> PyResult<PyExecutionResult> {
let options = options.map(|o| o.into());
let options = options.unwrap_or_default();

let result = self.ctx.read_fasta(file_path, options);
let result = self.ctx.read_fasta(file_path, options.into());
let df = wait_for_future(py, result).map_err(error::BioBearError::from)?;

Ok(PyExecutionResult::new(df))
Expand Down Expand Up @@ -107,3 +124,8 @@ impl ExonSessionContext {
pub fn connect() -> PyResult<ExonSessionContext> {
Ok(ExonSessionContext::default())
}

#[pyfunction]
pub fn new_context() -> PyResult<ExonSessionContext> {
Ok(ExonSessionContext::default())
}
Loading

0 comments on commit ceafa47

Please sign in to comment.