diff --git a/Cargo.toml b/Cargo.toml index 742ca28..7b852c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ name = "biobear" [dependencies] arrow = {version = "50.0.0", features = ["pyarrow"]} datafusion = "35" -exon = {version = "0.8.5", features = ["all"]} +exon = {version = "0.8.6", features = ["all"]} pyo3 = "0.20" thiserror = "1.0" tokio = {version = "1", features = ["rt"]} diff --git a/Makefile b/Makefile index 0ce78dd..6b074ac 100644 --- a/Makefile +++ b/Makefile @@ -1,13 +1,11 @@ build: - cargo build --release - maturin develop --release - -test: cargo build maturin develop - pytest run-benchmarks: hyperfine --runs 2 \ -n biopython 'python benchmarks/biopython-scan.py' \ -n biobear 'python benchmarks/biobear-scan.py' + +test: + bash ./bin/test.sh diff --git a/README.md b/README.md index baafc1f..642de52 100644 --- a/README.md +++ b/README.md @@ -87,9 +87,31 @@ df.head() # └──────────────┴─────────────────┴──────┴───────┴───┴────────────┴────────┴───────┴───────────────────────────────────┘ ``` -## Using DuckDB +## Ecosystem -biobear can also be used to read files into a [duckdb][] database. +BioBear aims to make it simple to move easily to and from different prominent data tools in Python. Generally, if the tool can read Arrow, it can read BioBear's output. To call out a few examples here: + +### Polars + +The session results and Reader objects can be converted to a Polars DataFrame. + +```python +import biobear as bb + +session = bb.connect() + +df = session.sql(""" + SELECT * FROM gff_scan('test.gff') +""").to_polars() +``` + +#### Known Issues + +For GenBank and mzML, the naive `SELECT *` will cause an error, because Polars doesn't support all Arrow types -- `Map` being the specific offender here. In these cases, select the fields from the map individually. Alternatively, you can first convert the table to a Pandas DataFrame. + +### DuckDB + +BioBear can also be used to read files into a [duckdb][] database. ```python import biobear as bb diff --git a/bin/test.sh b/bin/test.sh index 93ffdfc..22be1ae 100644 --- a/bin/test.sh +++ b/bin/test.sh @@ -23,6 +23,9 @@ function teardown { echo "Teardown completed." } +# Build the code +cargo build + # check docker and aws cli are installed if ! command -v docker &> /dev/null then diff --git a/pyproject.toml b/pyproject.toml index f6a472c..076539d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,7 @@ classifiers = [ "Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: PyPy", ] -dependencies = ["pyarrow>=12"] +dependencies = ["pyarrow>=15"] license = {file = "LICENSE"} name = "biobear" diff --git a/python/tests/test_fasta_reader.py b/python/tests/test_fasta_reader.py index 864d411..54c9f65 100644 --- a/python/tests/test_fasta_reader.py +++ b/python/tests/test_fasta_reader.py @@ -21,6 +21,15 @@ def test_fasta_reader(): assert len(df) == 2 +@pytest.mark.skipif( + not importlib.util.find_spec("polars"), reason="polars not installed" +) +def test_fasta_reader_missing_file(): + with pytest.raises(OSError): + fasta_reader = FastaReader(DATA / "does-not-exist.fasta") + fasta_reader.to_polars() + + # Add test for to_pandas() method @pytest.mark.skipif( not importlib.util.find_spec("pandas"), reason="pandas not installed" diff --git a/python/tests/test_session.py b/python/tests/test_session.py index dfc8af8..3dd01f7 100644 --- a/python/tests/test_session.py +++ b/python/tests/test_session.py @@ -55,6 +55,22 @@ def test_to_polars(): assert len(df) == 2 +@pytest.mark.skipif( + not importlib.util.find_spec("polars"), reason="polars not installed" +) +def test_to_polars_empty(): + """Test converting to a polars dataframe when the query is empty.""" + + session = connect() + + fasta_file = DATA / "test.fasta" + query = f"SELECT * FROM fasta_scan('{fasta_file}') WHERE id = 'not found'" + + results = session.sql(query) + df = results.to_polars() + assert len(df) == 0 + + def test_with_error(): """Test what happens on a bad query.""" session = connect() diff --git a/src/bam_reader.rs b/src/bam_reader.rs index 9e7bb54..6524f43 100644 --- a/src/bam_reader.rs +++ b/src/bam_reader.rs @@ -26,6 +26,8 @@ use tokio::runtime::Runtime; use std::io; use std::sync::Arc; +use crate::error::BioBearError; + #[pyclass(name = "_BamIndexedReader")] pub struct BamIndexedReader { path: String, @@ -43,7 +45,7 @@ impl BamIndexedReader { )); } - let rt = Arc::new(Runtime::new().unwrap()); + let rt = Arc::new(Runtime::new()?); Ok(Self { path: path.to_string(), @@ -94,12 +96,14 @@ impl BamIndexedReader { })?; let mut stream_ptr = self._runtime.block_on(async { - let stream = df.execute_stream().await.unwrap(); + let stream = df.execute_stream().await?; let dataset_record_batch_stream = DataFrameRecordBatchStream::new(stream, self._runtime.clone()); - FFI_ArrowArrayStream::new(Box::new(dataset_record_batch_stream)) - }); + Ok::(FFI_ArrowArrayStream::new(Box::new( + dataset_record_batch_stream, + ))) + })?; Python::with_gil(|py| unsafe { match ArrowArrayStreamReader::from_raw(&mut stream_ptr) { diff --git a/src/bcf_reader.rs b/src/bcf_reader.rs index 1db3cd1..40a1809 100644 --- a/src/bcf_reader.rs +++ b/src/bcf_reader.rs @@ -24,6 +24,8 @@ use exon::ExonSessionExt; use std::io; use std::sync::Arc; +use crate::error::BioBearError; + #[pyclass(name = "_BCFIndexedReader")] pub struct BCFIndexedReader { path: String, @@ -41,7 +43,7 @@ impl BCFIndexedReader { )); } - let rt = Arc::new(Runtime::new().unwrap()); + let rt = Arc::new(Runtime::new()?); Ok(Self { path: path.to_string(), @@ -77,12 +79,14 @@ impl BCFIndexedReader { })?; let mut stream_ptr = self._runtime.block_on(async { - let stream = df.execute_stream().await.unwrap(); + let stream = df.execute_stream().await?; let dataset_record_batch_stream = DataFrameRecordBatchStream::new(stream, self._runtime.clone()); - FFI_ArrowArrayStream::new(Box::new(dataset_record_batch_stream)) - }); + Ok::<_, BioBearError>(FFI_ArrowArrayStream::new(Box::new( + dataset_record_batch_stream, + ))) + })?; Python::with_gil(|py| unsafe { match ArrowArrayStreamReader::from_raw(&mut stream_ptr) { diff --git a/src/error.rs b/src/error.rs index e26985e..2e4d8aa 100644 --- a/src/error.rs +++ b/src/error.rs @@ -12,12 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use arrow::error::ArrowError; use datafusion::error::DataFusionError; +use exon::error::ExonError; use pyo3::PyErr; -#[derive(Debug, thiserror::Error)] +#[derive(Debug)] pub enum BioBearError { - #[error("{0}")] + IOError(String), Other(String), } @@ -30,13 +32,39 @@ impl BioBearError { impl From for PyErr { fn from(value: BioBearError) -> Self { match value { - BioBearError::Other(msg) => PyErr::new::(msg), + BioBearError::IOError(msg) => PyErr::new::(msg), + BioBearError::Other(msg) => PyErr::new::(msg), } } } impl From for BioBearError { fn from(value: DataFusionError) -> Self { + match value { + DataFusionError::IoError(msg) => Self::IOError(msg.to_string()), + DataFusionError::ObjectStore(err) => Self::IOError(err.to_string()), + _ => Self::Other(value.to_string()), + } + } +} + +impl From for BioBearError { + fn from(value: ExonError) -> Self { + match value { + ExonError::IOError(e) => BioBearError::IOError(e.to_string()), + _ => BioBearError::Other("Other Error".to_string()), + } + } +} + +impl From for BioBearError { + fn from(value: ArrowError) -> Self { Self::Other(value.to_string()) } } + +impl From for BioBearError { + fn from(value: std::io::Error) -> Self { + Self::IOError(value.to_string()) + } +} diff --git a/src/execution_result.rs b/src/execution_result.rs index a4dab9c..4a704d2 100644 --- a/src/execution_result.rs +++ b/src/execution_result.rs @@ -21,10 +21,13 @@ use arrow::{ }; use datafusion::prelude::DataFrame; use exon::ffi::DataFrameRecordBatchStream; -use pyo3::{pyclass, pymethods, types::PyTuple, PyErr, PyObject, PyResult, Python, ToPyObject}; +use pyo3::{pyclass, pymethods, types::PyTuple, IntoPy, PyObject, PyResult, Python, ToPyObject}; use tokio::runtime::Runtime; -use crate::{error, runtime::wait_for_future}; +use crate::{ + error::{self, BioBearError}, + runtime::wait_for_future, +}; #[pyclass(name = "ExecutionResult", subclass)] #[derive(Clone)] @@ -72,31 +75,30 @@ impl PyExecutionResult { let stream = wait_for_future(py, self.df.as_ref().clone().execute_stream()) .map_err(error::BioBearError::from)?; - let runtime = Arc::new(Runtime::new().unwrap()); + let runtime = Arc::new(Runtime::new()?); let dataframe_record_batch_stream = DataFrameRecordBatchStream::new(stream, runtime); let mut stream = FFI_ArrowArrayStream::new(Box::new(dataframe_record_batch_stream)); Python::with_gil(|py| unsafe { - match ArrowArrayStreamReader::from_raw(&mut stream) { - Ok(stream_reader) => stream_reader.into_pyarrow(py), - Err(err) => Err(PyErr::new::(format!( - "Error converting to pyarrow: {err}" - ))), - } + let stream_reader = + ArrowArrayStreamReader::from_raw(&mut stream).map_err(BioBearError::from)?; + + stream_reader.into_pyarrow(py) }) } /// Convert to Arrow Table fn to_arrow(&self, py: Python) -> PyResult { let batches = self.collect(py)?.to_object(py); + let schema = self.schema().into_py(py); Python::with_gil(|py| { // Instantiate pyarrow Table object and use its from_batches method let table_class = py.import("pyarrow")?.getattr("Table")?; - let args = PyTuple::new(py, &[batches]); + let args = PyTuple::new(py, &[batches, schema]); let table: PyObject = table_class.call_method1("from_batches", args)?.into(); Ok(table) }) @@ -105,15 +107,19 @@ impl PyExecutionResult { /// Convert to a Polars DataFrame fn to_polars(&self, py: Python) -> PyResult { let batches = self.collect(py)?.to_object(py); + let schema = self.schema().into_py(py); + + let schema = schema.into_py(py); Python::with_gil(|py| { let table_class = py.import("pyarrow")?.getattr("Table")?; - let args = PyTuple::new(py, &[batches]); + let args = (batches, schema); let table: PyObject = table_class.call_method1("from_batches", args)?.into(); - let table_class = py.import("polars")?.getattr("DataFrame")?; - let args = PyTuple::new(py, &[table]); - let result = table_class.call1(args)?.into(); + let module = py.import("polars")?; + let args = (table,); + let result = module.call_method1("from_arrow", args)?.into(); + Ok(result) }) } diff --git a/src/exon_reader.rs b/src/exon_reader.rs index 1a76358..421cbd3 100644 --- a/src/exon_reader.rs +++ b/src/exon_reader.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::io; use std::str::FromStr; use std::sync::Arc; @@ -26,6 +25,8 @@ use exon::{new_exon_config, ExonRuntimeEnvExt, ExonSessionExt}; use pyo3::prelude::*; use tokio::runtime::Runtime; +use crate::error::BioBearError; + #[pyclass(name = "_ExonReader")] pub struct ExonReader { df: datafusion::dataframe::DataFrame, @@ -39,8 +40,8 @@ impl ExonReader { file_type: ExonFileType, compression: Option, batch_size: Option, - ) -> io::Result { - let rt = Arc::new(Runtime::new().unwrap()); + ) -> Result { + let rt = Arc::new(Runtime::new()?); let mut config = new_exon_config(); @@ -53,25 +54,22 @@ impl ExonReader { let df = rt.block_on(async { ctx.runtime_env() .exon_register_object_store_uri(path) - .await?; - - match ctx.read_exon_table(path, file_type, compression).await { - Ok(df) => Ok(df), - Err(e) => Err(io::Error::new( - io::ErrorKind::Other, - format!("Error reading GFF file: {e}"), - )), - } - }); - - match df { - Ok(df) => Ok(Self { - df, - _runtime: rt, - exhausted: false, - }), - Err(e) => Err(e), - } + .await + .map_err(BioBearError::from)?; + + let df = ctx + .read_exon_table(path, file_type, compression) + .await + .map_err(BioBearError::from)?; + + Ok::<_, BioBearError>(df) + })?; + + Ok(Self { + df, + _runtime: rt, + exhausted: false, + }) } } @@ -84,27 +82,17 @@ impl ExonReader { compression: Option<&str>, batch_size: Option, ) -> PyResult { - let exon_file_type = ExonFileType::from_str(file_type).map_err(|e| { - PyErr::new::(format!( - "Error reading file type: {e:?}" - )) - })?; + let exon_file_type = ExonFileType::from_str(file_type).map_err(BioBearError::from)?; - let file_compression_type = - compression.map( - |compression| match FileCompressionType::from_str(compression) { - Ok(compression_type) => Ok(compression_type), - Err(e) => Err(PyErr::new::(format!( - "Error reading compression type: {e:?}" - ))), - }, - ); + let file_compression_type = compression + .map(FileCompressionType::from_str) + .transpose() + .map_err(BioBearError::from)?; - let file_compression_type = file_compression_type.transpose()?; + let open = Self::open(path, exon_file_type, file_compression_type, batch_size) + .map_err(BioBearError::from)?; - Self::open(path, exon_file_type, file_compression_type, batch_size).map_err(|e| { - PyErr::new::(format!("Error opening file {path}: {e}")) - }) + Ok(open) } fn is_exhausted(&self) -> bool { @@ -114,22 +102,27 @@ impl ExonReader { #[allow(clippy::wrong_self_convention)] fn to_pyarrow(&mut self) -> PyResult { let mut stream_ptr = self._runtime.block_on(async { - let stream = self.df.clone().execute_stream().await.unwrap(); + let stream = self + .df + .clone() + .execute_stream() + .await + .map_err::(|e| e.into())?; + let dataset_record_batch_stream = DataFrameRecordBatchStream::new(stream, self._runtime.clone()); - FFI_ArrowArrayStream::new(Box::new(dataset_record_batch_stream)) - }); + Ok::(FFI_ArrowArrayStream::new(Box::new( + dataset_record_batch_stream, + ))) + })?; self.exhausted = true; Python::with_gil(|py| unsafe { - match ArrowArrayStreamReader::from_raw(&mut stream_ptr) { - Ok(stream_reader) => stream_reader.into_pyarrow(py), - Err(err) => Err(PyErr::new::(format!( - "Error converting to pyarrow: {err}" - ))), - } + ArrowArrayStreamReader::from_raw(&mut stream_ptr) + .map_err(BioBearError::from)? + .into_pyarrow(py) }) } } diff --git a/src/lib.rs b/src/lib.rs index 09033ee..416743b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,8 +44,7 @@ fn biobear(_py: Python, m: &PyModule) -> PyResult<()> { format!("biobear-python-thread-{}", id) }) .enable_all() - .build() - .unwrap(); + .build()?; m.add("__runtime", TokioRuntime(runtime))?; diff --git a/src/vcf_reader.rs b/src/vcf_reader.rs index e3b5c0c..24b7ff5 100644 --- a/src/vcf_reader.rs +++ b/src/vcf_reader.rs @@ -24,6 +24,8 @@ use exon::{new_exon_config, ExonSessionExt}; use std::io; use std::sync::Arc; +use crate::error::BioBearError; + #[pyclass(name = "_VCFIndexedReader")] pub struct VCFIndexedReader { path: String, @@ -41,7 +43,7 @@ impl VCFIndexedReader { )); } - let rt = Arc::new(Runtime::new().unwrap()); + let rt = Arc::new(Runtime::new()?); Ok(Self { path: path.to_string(), @@ -77,12 +79,14 @@ impl VCFIndexedReader { })?; let mut stream_ptr = self._runtime.block_on(async { - let stream = df.execute_stream().await.unwrap(); + let stream = df.execute_stream().await?; let dataset_record_batch_stream = DataFrameRecordBatchStream::new(stream, self._runtime.clone()); - FFI_ArrowArrayStream::new(Box::new(dataset_record_batch_stream)) - }); + Ok::<_, BioBearError>(FFI_ArrowArrayStream::new(Box::new( + dataset_record_batch_stream, + ))) + })?; Python::with_gil(|py| unsafe { match ArrowArrayStreamReader::from_raw(&mut stream_ptr) {