Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fix value error for missing file #92

Merged
merged 6 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ name = "biobear"
[dependencies]
arrow = {version = "50.0.0", features = ["pyarrow"]}
datafusion = "35"
exon = {version = "0.8.5", features = ["all"]}
exon = {version = "0.8.6", features = ["all"]}
pyo3 = "0.20"
thiserror = "1.0"
tokio = {version = "1", features = ["rt"]}
Expand Down
8 changes: 3 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
build:
cargo build --release
maturin develop --release

test:
cargo build
maturin develop
pytest

run-benchmarks:
hyperfine --runs 2 \
-n biopython 'python benchmarks/biopython-scan.py' \
-n biobear 'python benchmarks/biobear-scan.py'

test:
bash ./bin/test.sh
26 changes: 24 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,31 @@ df.head()
# └──────────────┴─────────────────┴──────┴───────┴───┴────────────┴────────┴───────┴───────────────────────────────────┘
```

## Using DuckDB
## Ecosystem

biobear can also be used to read files into a [duckdb][] database.
BioBear aims to make it simple to move easily to and from different prominent data tools in Python. Generally, if the tool can read Arrow, it can read BioBear's output. To call out a few examples here:

### Polars

The session results and Reader objects can be converted to a Polars DataFrame.

```python
import biobear as bb

session = bb.connect()

df = session.sql("""
SELECT * FROM gff_scan('test.gff')
""").to_polars()
```

#### Known Issues

For GenBank and mzML, the naive `SELECT *` will cause an error, because Polars doesn't support all Arrow types -- `Map` being the specific offender here. In these cases, select the fields from the map individually. Alternatively, you can first convert the table to a Pandas DataFrame.

### DuckDB

BioBear can also be used to read files into a [duckdb][] database.

```python
import biobear as bb
Expand Down
3 changes: 3 additions & 0 deletions bin/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ function teardown {
echo "Teardown completed."
}

# Build the code
cargo build

# check docker and aws cli are installed
if ! command -v docker &> /dev/null
then
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ classifiers = [
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
]
dependencies = ["pyarrow>=12"]
dependencies = ["pyarrow>=15"]

license = {file = "LICENSE"}
name = "biobear"
Expand Down
9 changes: 9 additions & 0 deletions python/tests/test_fasta_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ def test_fasta_reader():
assert len(df) == 2


@pytest.mark.skipif(
not importlib.util.find_spec("polars"), reason="polars not installed"
)
def test_fasta_reader_missing_file():
with pytest.raises(OSError):
fasta_reader = FastaReader(DATA / "does-not-exist.fasta")
fasta_reader.to_polars()


# Add test for to_pandas() method
@pytest.mark.skipif(
not importlib.util.find_spec("pandas"), reason="pandas not installed"
Expand Down
16 changes: 16 additions & 0 deletions python/tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,22 @@ def test_to_polars():
assert len(df) == 2


@pytest.mark.skipif(
not importlib.util.find_spec("polars"), reason="polars not installed"
)
def test_to_polars_empty():
"""Test converting to a polars dataframe when the query is empty."""

session = connect()

fasta_file = DATA / "test.fasta"
query = f"SELECT * FROM fasta_scan('{fasta_file}') WHERE id = 'not found'"

results = session.sql(query)
df = results.to_polars()
assert len(df) == 0


def test_with_error():
"""Test what happens on a bad query."""
session = connect()
Expand Down
12 changes: 8 additions & 4 deletions src/bam_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use tokio::runtime::Runtime;
use std::io;
use std::sync::Arc;

use crate::error::BioBearError;

#[pyclass(name = "_BamIndexedReader")]
pub struct BamIndexedReader {
path: String,
Expand All @@ -43,7 +45,7 @@ impl BamIndexedReader {
));
}

let rt = Arc::new(Runtime::new().unwrap());
let rt = Arc::new(Runtime::new()?);

Ok(Self {
path: path.to_string(),
Expand Down Expand Up @@ -94,12 +96,14 @@ impl BamIndexedReader {
})?;

let mut stream_ptr = self._runtime.block_on(async {
let stream = df.execute_stream().await.unwrap();
let stream = df.execute_stream().await?;
let dataset_record_batch_stream =
DataFrameRecordBatchStream::new(stream, self._runtime.clone());

FFI_ArrowArrayStream::new(Box::new(dataset_record_batch_stream))
});
Ok::<FFI_ArrowArrayStream, BioBearError>(FFI_ArrowArrayStream::new(Box::new(
dataset_record_batch_stream,
)))
})?;

Python::with_gil(|py| unsafe {
match ArrowArrayStreamReader::from_raw(&mut stream_ptr) {
Expand Down
12 changes: 8 additions & 4 deletions src/bcf_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use exon::ExonSessionExt;
use std::io;
use std::sync::Arc;

use crate::error::BioBearError;

#[pyclass(name = "_BCFIndexedReader")]
pub struct BCFIndexedReader {
path: String,
Expand All @@ -41,7 +43,7 @@ impl BCFIndexedReader {
));
}

let rt = Arc::new(Runtime::new().unwrap());
let rt = Arc::new(Runtime::new()?);

Ok(Self {
path: path.to_string(),
Expand Down Expand Up @@ -77,12 +79,14 @@ impl BCFIndexedReader {
})?;

let mut stream_ptr = self._runtime.block_on(async {
let stream = df.execute_stream().await.unwrap();
let stream = df.execute_stream().await?;
let dataset_record_batch_stream =
DataFrameRecordBatchStream::new(stream, self._runtime.clone());

FFI_ArrowArrayStream::new(Box::new(dataset_record_batch_stream))
});
Ok::<_, BioBearError>(FFI_ArrowArrayStream::new(Box::new(
dataset_record_batch_stream,
)))
})?;

Python::with_gil(|py| unsafe {
match ArrowArrayStreamReader::from_raw(&mut stream_ptr) {
Expand Down
34 changes: 31 additions & 3 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use arrow::error::ArrowError;
use datafusion::error::DataFusionError;
use exon::error::ExonError;
use pyo3::PyErr;

#[derive(Debug, thiserror::Error)]
#[derive(Debug)]
pub enum BioBearError {
#[error("{0}")]
IOError(String),
Other(String),
}

Expand All @@ -30,13 +32,39 @@ impl BioBearError {
impl From<BioBearError> for PyErr {
fn from(value: BioBearError) -> Self {
match value {
BioBearError::Other(msg) => PyErr::new::<pyo3::exceptions::PyValueError, _>(msg),
BioBearError::IOError(msg) => PyErr::new::<pyo3::exceptions::PyIOError, _>(msg),
BioBearError::Other(msg) => PyErr::new::<pyo3::exceptions::PyIOError, _>(msg),
}
}
}

impl From<DataFusionError> for BioBearError {
fn from(value: DataFusionError) -> Self {
match value {
DataFusionError::IoError(msg) => Self::IOError(msg.to_string()),
DataFusionError::ObjectStore(err) => Self::IOError(err.to_string()),
_ => Self::Other(value.to_string()),
}
}
}

impl From<ExonError> for BioBearError {
fn from(value: ExonError) -> Self {
match value {
ExonError::IOError(e) => BioBearError::IOError(e.to_string()),
_ => BioBearError::Other("Other Error".to_string()),
}
}
}

impl From<ArrowError> for BioBearError {
fn from(value: ArrowError) -> Self {
Self::Other(value.to_string())
}
}

impl From<std::io::Error> for BioBearError {
fn from(value: std::io::Error) -> Self {
Self::IOError(value.to_string())
}
}
34 changes: 20 additions & 14 deletions src/execution_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ use arrow::{
};
use datafusion::prelude::DataFrame;
use exon::ffi::DataFrameRecordBatchStream;
use pyo3::{pyclass, pymethods, types::PyTuple, PyErr, PyObject, PyResult, Python, ToPyObject};
use pyo3::{pyclass, pymethods, types::PyTuple, IntoPy, PyObject, PyResult, Python, ToPyObject};
use tokio::runtime::Runtime;

use crate::{error, runtime::wait_for_future};
use crate::{
error::{self, BioBearError},
runtime::wait_for_future,
};

#[pyclass(name = "ExecutionResult", subclass)]
#[derive(Clone)]
Expand Down Expand Up @@ -72,31 +75,30 @@ impl PyExecutionResult {
let stream = wait_for_future(py, self.df.as_ref().clone().execute_stream())
.map_err(error::BioBearError::from)?;

let runtime = Arc::new(Runtime::new().unwrap());
let runtime = Arc::new(Runtime::new()?);

let dataframe_record_batch_stream = DataFrameRecordBatchStream::new(stream, runtime);

let mut stream = FFI_ArrowArrayStream::new(Box::new(dataframe_record_batch_stream));

Python::with_gil(|py| unsafe {
match ArrowArrayStreamReader::from_raw(&mut stream) {
Ok(stream_reader) => stream_reader.into_pyarrow(py),
Err(err) => Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
"Error converting to pyarrow: {err}"
))),
}
let stream_reader =
ArrowArrayStreamReader::from_raw(&mut stream).map_err(BioBearError::from)?;

stream_reader.into_pyarrow(py)
})
}

/// Convert to Arrow Table
fn to_arrow(&self, py: Python) -> PyResult<PyObject> {
let batches = self.collect(py)?.to_object(py);
let schema = self.schema().into_py(py);

Python::with_gil(|py| {
// Instantiate pyarrow Table object and use its from_batches method
let table_class = py.import("pyarrow")?.getattr("Table")?;

let args = PyTuple::new(py, &[batches]);
let args = PyTuple::new(py, &[batches, schema]);
let table: PyObject = table_class.call_method1("from_batches", args)?.into();
Ok(table)
})
Expand All @@ -105,15 +107,19 @@ impl PyExecutionResult {
/// Convert to a Polars DataFrame
fn to_polars(&self, py: Python) -> PyResult<PyObject> {
let batches = self.collect(py)?.to_object(py);
let schema = self.schema().into_py(py);

let schema = schema.into_py(py);

Python::with_gil(|py| {
let table_class = py.import("pyarrow")?.getattr("Table")?;
let args = PyTuple::new(py, &[batches]);
let args = (batches, schema);
let table: PyObject = table_class.call_method1("from_batches", args)?.into();

let table_class = py.import("polars")?.getattr("DataFrame")?;
let args = PyTuple::new(py, &[table]);
let result = table_class.call1(args)?.into();
let module = py.import("polars")?;
let args = (table,);
let result = module.call_method1("from_arrow", args)?.into();

Ok(result)
})
}
Expand Down
Loading
Loading