Skip to content

Commit

Permalink
Implement FromPyObject for PyArrowBuffer (#241)
Browse files Browse the repository at this point in the history
* Implement FromPyObject for PyArrowBuffer

* Add test for buffer
  • Loading branch information
kylebarron authored Oct 16, 2024
1 parent 6d513ea commit 64a6591
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 34 deletions.
5 changes: 5 additions & 0 deletions arro3-core/python/arro3/core/_core.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ class ArrayReader:
def field(self) -> Field:
"""Access the field of this reader."""

class Buffer:
"""An Arrow Buffer"""
def __init__(self, buffer) -> None: ...
def __buffer__(self, flags: int) -> memoryview: ...

class ChunkedArray:
"""An Arrow ChunkedArray."""
@overload
Expand Down
21 changes: 20 additions & 1 deletion arro3-core/python/arro3/core/types.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from __future__ import annotations

import array as _array
import mmap
import sys
from typing import TYPE_CHECKING, Protocol, Tuple, Union

if TYPE_CHECKING:
Expand Down Expand Up @@ -78,8 +81,24 @@ class BufferProtocolExportable(Protocol):
def __buffer__(self, flags: int) -> memoryview: ...


# From numpy
# https://github.com/numpy/numpy/blob/961b70f6aaeed67147245b56ddb3f12ed1a050b5/numpy/__init__.pyi#L1772C1-L1785C1
if sys.version_info >= (3, 12):
from collections.abc import Buffer as _SupportsBuffer
else:
_SupportsBuffer = Union[
bytes,
bytearray,
memoryview,
_array.array,
mmap.mmap,
"np.ndarray",
BufferProtocolExportable,
]


# Numpy arrays don't yet declare `__buffer__` (or maybe just on a very recent version)
ArrayInput = Union[ArrowArrayExportable, BufferProtocolExportable, "np.ndarray"]
ArrayInput = Union[ArrowArrayExportable, BufferProtocolExportable, _SupportsBuffer]
"""Accepted input as an Arrow array.
Buffer protocol input (such as numpy arrays) will be interpreted zero-copy except in the
Expand Down
2 changes: 1 addition & 1 deletion arro3-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ fn _core(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {

m.add_class::<pyo3_arrow::PyArray>()?;
m.add_class::<pyo3_arrow::PyArrayReader>()?;
m.add_class::<pyo3_arrow::PyArrowBuffer>()?;
m.add_class::<pyo3_arrow::buffer::PyArrowBuffer>()?;
m.add_class::<pyo3_arrow::PyChunkedArray>()?;
m.add_class::<pyo3_arrow::PyDataType>()?;
m.add_class::<pyo3_arrow::PyField>()?;
Expand Down
6 changes: 2 additions & 4 deletions pyo3-arrow/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,17 +224,15 @@ impl PyArray {
}

#[cfg(feature = "buffer_protocol")]
fn buffer(&self) -> crate::PyArrowBuffer {
fn buffer(&self) -> crate::buffer::PyArrowBuffer {
use arrow::array::AsArray;

match self.array.data_type() {
DataType::Int64 => {
let arr = self.array.as_primitive::<Int64Type>();
let values = arr.values();
let buffer = values.inner().clone();
crate::PyArrowBuffer {
inner: Some(buffer),
}
crate::buffer::PyArrowBuffer::from_arrow(buffer)
}
_ => todo!(),
}
Expand Down
118 changes: 93 additions & 25 deletions pyo3-arrow/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,31 @@ use crate::PyArray;
/// the core buffer into Python. This allows for zero-copy data sharing with numpy via
/// `numpy.frombuffer`.
#[pyclass(module = "arro3.core._core", name = "Buffer", subclass)]
pub struct PyArrowBuffer {
pub(crate) inner: Option<Buffer>,
pub struct PyArrowBuffer(Buffer);

impl AsRef<Buffer> for PyArrowBuffer {
fn as_ref(&self) -> &Buffer {
&self.0
}
}

#[pymethods]
impl PyArrowBuffer {
pub(crate) fn from_arrow(buffer: Buffer) -> Self {
Self(buffer)
}

/// Consume and return the [Buffer]
pub fn into_inner(self) -> Buffer {
self.0
}
}

#[pymethods]
impl PyArrowBuffer {
/// new
#[new]
pub fn new(buf: Vec<u8>) -> Self {
Self {
inner: Some(Buffer::from_vec(buf)),
}
pub fn new(buf: PyArrowBuffer) -> Self {
buf
}

/// This is taken from opendal:
Expand All @@ -49,27 +61,32 @@ impl PyArrowBuffer {
view: *mut ffi::Py_buffer,
flags: c_int,
) -> PyResult<()> {
if let Some(buf) = &slf.inner {
let bytes = buf.as_slice();
let ret = ffi::PyBuffer_FillInfo(
view,
slf.as_ptr() as *mut _,
bytes.as_ptr() as *mut _,
bytes.len().try_into().unwrap(),
1, // read only
flags,
);
if ret == -1 {
return Err(PyErr::fetch(slf.py()));
}
Ok(())
} else {
Err(PyValueError::new_err("Buffer has already been disposed"))
let bytes = slf.0.as_slice();
let ret = ffi::PyBuffer_FillInfo(
view,
slf.as_ptr() as *mut _,
bytes.as_ptr() as *mut _,
bytes.len().try_into().unwrap(),
1, // read only
flags,
);
if ret == -1 {
return Err(PyErr::fetch(slf.py()));
}
Ok(())
}

unsafe fn __releasebuffer__(mut slf: PyRefMut<Self>, _view: *mut ffi::Py_buffer) {
slf.inner.take();
unsafe fn __releasebuffer__(&self, _view: *mut ffi::Py_buffer) {}
}

impl<'py> FromPyObject<'py> for PyArrowBuffer {
fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
let buffer = ob.extract::<AnyBufferProtocol>()?;
if !matches!(buffer, AnyBufferProtocol::UInt8(_)) {
return Err(PyValueError::new_err("Expected u8 buffer protocol object"));
}

Ok(Self(buffer.into_arrow_buffer()?))
}
}

Expand Down Expand Up @@ -374,6 +391,57 @@ impl AnyBufferProtocol {
}
}

/// Consume this buffer protocol object and convert to an Arrow [Buffer].
pub fn into_arrow_buffer(self) -> PyArrowResult<Buffer> {
let len_bytes = self.len_bytes()?;
let ptr = NonNull::new(self.buf_ptr()? as _)
.ok_or(PyValueError::new_err("Expected buffer ptr to be non null"))?;

let buffer = match self {
Self::UInt8(buf) => {
let owner = Arc::new(buf);
unsafe { Buffer::from_custom_allocation(ptr, len_bytes, owner) }
}
Self::UInt16(buf) => {
let owner = Arc::new(buf);
unsafe { Buffer::from_custom_allocation(ptr, len_bytes, owner) }
}
Self::UInt32(buf) => {
let owner = Arc::new(buf);
unsafe { Buffer::from_custom_allocation(ptr, len_bytes, owner) }
}
Self::UInt64(buf) => {
let owner = Arc::new(buf);
unsafe { Buffer::from_custom_allocation(ptr, len_bytes, owner) }
}
Self::Int8(buf) => {
let owner = Arc::new(buf);
unsafe { Buffer::from_custom_allocation(ptr, len_bytes, owner) }
}
Self::Int16(buf) => {
let owner = Arc::new(buf);
unsafe { Buffer::from_custom_allocation(ptr, len_bytes, owner) }
}
Self::Int32(buf) => {
let owner = Arc::new(buf);
unsafe { Buffer::from_custom_allocation(ptr, len_bytes, owner) }
}
Self::Int64(buf) => {
let owner = Arc::new(buf);
unsafe { Buffer::from_custom_allocation(ptr, len_bytes, owner) }
}
Self::Float32(buf) => {
let owner = Arc::new(buf);
unsafe { Buffer::from_custom_allocation(ptr, len_bytes, owner) }
}
Self::Float64(buf) => {
let owner = Arc::new(buf);
unsafe { Buffer::from_custom_allocation(ptr, len_bytes, owner) }
}
};
Ok(buffer)
}

fn item_count(&self) -> PyResult<usize> {
let out = match self {
Self::UInt8(buf) => buf.inner()?.item_count(),
Expand Down
2 changes: 0 additions & 2 deletions pyo3-arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ mod utils;

pub use array::PyArray;
pub use array_reader::PyArrayReader;
#[cfg(feature = "buffer_protocol")]
pub use buffer::PyArrowBuffer;
pub use chunked::PyChunkedArray;
pub use datatypes::PyDataType;
pub use field::PyField;
Expand Down
11 changes: 10 additions & 1 deletion tests/core/test_buffer_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import numpy as np
import pyarrow as pa
import pytest
from arro3.core import Array
from arro3.core import Array, Buffer


def test_from_buffer():
Expand Down Expand Up @@ -66,3 +66,12 @@ def test_multi_dimensional():
assert pa_arr.type.value_type.list_size == 3
assert pa_arr.type.value_type.value_type.list_size == 2
assert pa_arr.type.value_type.value_type.value_type == pa.uint8()


def test_round_trip_buffer():
arr = np.arange(5, dtype=np.uint8)
buffer = Buffer(arr)
retour = np.frombuffer(buffer, dtype=np.uint8)
assert np.array_equal(arr, retour)

assert np.array_equal(arr, Array(buffer).to_numpy())

0 comments on commit 64a6591

Please sign in to comment.