diff --git a/cpp/src/arrow/c/bridge.cc b/cpp/src/arrow/c/bridge.cc index 0ffa5291812a9..4ec79a73029b4 100644 --- a/cpp/src/arrow/c/bridge.cc +++ b/cpp/src/arrow/c/bridge.cc @@ -1967,6 +1967,14 @@ Result> ImportRecordBatch(struct ArrowArray* array, return ImportRecordBatch(array, *maybe_schema); } +Result> DefaultDeviceMapper(ArrowDeviceType device_type, + int64_t device_id) { + if (device_type != ARROW_DEVICE_CPU) { + return Status::NotImplemented("Only importing data on CPU is supported"); + } + return default_cpu_memory_manager(); +} + Result> ImportDeviceArray(struct ArrowDeviceArray* array, std::shared_ptr type, const DeviceMemoryMapper& mapper) { diff --git a/cpp/src/arrow/c/bridge.h b/cpp/src/arrow/c/bridge.h index e98a42818f628..0ced3d38cd1e6 100644 --- a/cpp/src/arrow/c/bridge.h +++ b/cpp/src/arrow/c/bridge.h @@ -218,6 +218,10 @@ Status ExportDeviceRecordBatch(const RecordBatch& batch, using DeviceMemoryMapper = std::function>(ArrowDeviceType, int64_t)>; +ARROW_EXPORT +Result> DefaultDeviceMapper(ArrowDeviceType device_type, + int64_t device_id); + /// \brief EXPERIMENTAL: Import C++ device array from the C data interface. /// /// The ArrowArray struct has its contents moved (as per the C data interface @@ -226,12 +230,13 @@ using DeviceMemoryMapper = /// /// \param[in,out] array C data interface struct holding the array data /// \param[in] type type of the imported array -/// \param[in] mapper A function to map device + id to memory manager +/// \param[in] mapper A function to map device + id to memory manager. If not +/// specified, defaults to map "cpu" to the built-in default memory manager. /// \return Imported array object ARROW_EXPORT -Result> ImportDeviceArray(struct ArrowDeviceArray* array, - std::shared_ptr type, - const DeviceMemoryMapper& mapper); +Result> ImportDeviceArray( + struct ArrowDeviceArray* array, std::shared_ptr type, + const DeviceMemoryMapper& mapper = DefaultDeviceMapper); /// \brief EXPERIMENTAL: Import C++ device array and its type from the C data interface. /// @@ -242,12 +247,13 @@ Result> ImportDeviceArray(struct ArrowDeviceArray* array, /// /// \param[in,out] array C data interface struct holding the array data /// \param[in,out] type C data interface struct holding the array type -/// \param[in] mapper A function to map device + id to memory manager +/// \param[in] mapper A function to map device + id to memory manager. If not +/// specified, defaults to map "cpu" to the built-in default memory manager. /// \return Imported array object ARROW_EXPORT -Result> ImportDeviceArray(struct ArrowDeviceArray* array, - struct ArrowSchema* type, - const DeviceMemoryMapper& mapper); +Result> ImportDeviceArray( + struct ArrowDeviceArray* array, struct ArrowSchema* type, + const DeviceMemoryMapper& mapper = DefaultDeviceMapper); /// \brief EXPERIMENTAL: Import C++ record batch with buffers on a device from the C data /// interface. @@ -259,12 +265,13 @@ Result> ImportDeviceArray(struct ArrowDeviceArray* array, /// /// \param[in,out] array C data interface struct holding the record batch data /// \param[in] schema schema of the imported record batch -/// \param[in] mapper A function to map device + id to memory manager +/// \param[in] mapper A function to map device + id to memory manager. If not +/// specified, defaults to map "cpu" to the built-in default memory manager. /// \return Imported record batch object ARROW_EXPORT Result> ImportDeviceRecordBatch( struct ArrowDeviceArray* array, std::shared_ptr schema, - const DeviceMemoryMapper& mapper); + const DeviceMemoryMapper& mapper = DefaultDeviceMapper); /// \brief EXPERIMENTAL: Import C++ record batch with buffers on a device and its schema /// from the C data interface. @@ -278,12 +285,13 @@ Result> ImportDeviceRecordBatch( /// /// \param[in,out] array C data interface struct holding the record batch data /// \param[in,out] schema C data interface struct holding the record batch schema -/// \param[in] mapper A function to map device + id to memory manager +/// \param[in] mapper A function to map device + id to memory manager. If not +/// specified, defaults to map "cpu" to the built-in default memory manager. /// \return Imported record batch object ARROW_EXPORT Result> ImportDeviceRecordBatch( struct ArrowDeviceArray* array, struct ArrowSchema* schema, - const DeviceMemoryMapper& mapper); + const DeviceMemoryMapper& mapper = DefaultDeviceMapper); /// @} diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index ad01d45571ba1..e1bf494920566 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -1778,6 +1778,70 @@ cdef class Array(_PandasConvertible): return pyarrow_wrap_array(array) + def _export_to_c_device(self, out_ptr, out_schema_ptr=0): + """ + Export to a C ArrowDeviceArray struct, given its pointer. + + If a C ArrowSchema struct pointer is also given, the array type + is exported to it at the same time. + + Parameters + ---------- + out_ptr: int + The raw pointer to a C ArrowDeviceArray struct. + out_schema_ptr: int (optional) + The raw pointer to a C ArrowSchema struct. + + Be careful: if you don't pass the ArrowDeviceArray struct to a consumer, + array memory will leak. This is a low-level function intended for + expert users. + """ + cdef: + void* c_ptr = _as_c_pointer(out_ptr) + void* c_schema_ptr = _as_c_pointer(out_schema_ptr, + allow_null=True) + with nogil: + check_status(ExportDeviceArray( + deref(self.sp_array), NULL, + c_ptr, c_schema_ptr)) + + @staticmethod + def _import_from_c_device(in_ptr, type): + """ + Import Array from a C ArrowDeviceArray struct, given its pointer + and the imported array type. + + Parameters + ---------- + in_ptr: int + The raw pointer to a C ArrowDeviceArray struct. + type: DataType or int + Either a DataType object, or the raw pointer to a C ArrowSchema + struct. + + This is a low-level function intended for expert users. + """ + cdef: + void* c_ptr = _as_c_pointer(in_ptr) + void* c_type_ptr + shared_ptr[CArray] c_array + + c_type = pyarrow_unwrap_data_type(type) + if c_type == nullptr: + # Not a DataType object, perhaps a raw ArrowSchema pointer + c_type_ptr = _as_c_pointer(type) + with nogil: + c_array = GetResultValue( + ImportDeviceArray( c_ptr, + c_type_ptr) + ) + else: + with nogil: + c_array = GetResultValue( + ImportDeviceArray( c_ptr, c_type) + ) + return pyarrow_wrap_array(c_array) + def __dlpack__(self, stream=None): """Export a primitive array as a DLPack capsule. diff --git a/python/pyarrow/cffi.py b/python/pyarrow/cffi.py index 961b61dee59fd..1da1a91691404 100644 --- a/python/pyarrow/cffi.py +++ b/python/pyarrow/cffi.py @@ -64,6 +64,16 @@ // Opaque producer-specific data void* private_data; }; + + typedef int32_t ArrowDeviceType; + + struct ArrowDeviceArray { + struct ArrowArray array; + int64_t device_id; + ArrowDeviceType device_type; + void* sync_event; + int64_t reserved[3]; + }; """ # TODO use out-of-line mode for faster import and avoid C parsing diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 05d33180209c6..bc9d05ddbbc37 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -346,6 +346,12 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: CResult[unique_ptr[CResizableBuffer]] AllocateResizableBuffer( const int64_t size, CMemoryPool* pool) + cdef cppclass CSyncEvent" arrow::Device::SyncEvent": + pass + + cdef cppclass CDevice" arrow::Device": + pass + cdef CMemoryPool* c_default_memory_pool" arrow::default_memory_pool"() cdef CMemoryPool* c_system_memory_pool" arrow::system_memory_pool"() cdef CStatus c_jemalloc_memory_pool" arrow::jemalloc_memory_pool"( @@ -2902,6 +2908,9 @@ cdef extern from "arrow/c/abi.h": cdef struct ArrowArrayStream: void (*release)(ArrowArrayStream*) noexcept nogil + cdef struct ArrowDeviceArray: + pass + cdef extern from "arrow/c/bridge.h" namespace "arrow" nogil: CStatus ExportType(CDataType&, ArrowSchema* out) CResult[shared_ptr[CDataType]] ImportType(ArrowSchema*) @@ -2934,6 +2943,20 @@ cdef extern from "arrow/c/bridge.h" namespace "arrow" nogil: CStatus ExportChunkedArray(shared_ptr[CChunkedArray], ArrowArrayStream*) CResult[shared_ptr[CChunkedArray]] ImportChunkedArray(ArrowArrayStream*) + CStatus ExportDeviceArray(const CArray&, shared_ptr[CSyncEvent], + ArrowDeviceArray* out, ArrowSchema*) + CResult[shared_ptr[CArray]] ImportDeviceArray( + ArrowDeviceArray*, shared_ptr[CDataType]) + CResult[shared_ptr[CArray]] ImportDeviceArray( + ArrowDeviceArray*, ArrowSchema*) + + CStatus ExportDeviceRecordBatch(const CRecordBatch&, shared_ptr[CSyncEvent], + ArrowDeviceArray* out, ArrowSchema*) + CResult[shared_ptr[CRecordBatch]] ImportDeviceRecordBatch( + ArrowDeviceArray*, shared_ptr[CSchema]) + CResult[shared_ptr[CRecordBatch]] ImportDeviceRecordBatch( + ArrowDeviceArray*, ArrowSchema*) + cdef extern from "arrow/util/byte_size.h" namespace "arrow::util" nogil: CResult[int64_t] ReferencedBufferSize(const CArray& array_data) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 38b08b626113b..40d22494e6ffb 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -3145,6 +3145,68 @@ cdef class RecordBatch(_Tabular): return pyarrow_wrap_batch(c_batch) + def _export_to_c_device(self, out_ptr, out_schema_ptr=0): + """ + Export to a C ArrowDeviceArray struct, given its pointer. + + If a C ArrowSchema struct pointer is also given, the record batch + schema is exported to it at the same time. + + Parameters + ---------- + out_ptr: int + The raw pointer to a C ArrowDeviceArray struct. + out_schema_ptr: int (optional) + The raw pointer to a C ArrowSchema struct. + + Be careful: if you don't pass the ArrowDeviceArray struct to a consumer, + array memory will leak. This is a low-level function intended for + expert users. + """ + cdef: + void* c_ptr = _as_c_pointer(out_ptr) + void* c_schema_ptr = _as_c_pointer(out_schema_ptr, + allow_null=True) + with nogil: + check_status(ExportDeviceRecordBatch( + deref(self.sp_batch), NULL, + c_ptr, c_schema_ptr) + ) + + @staticmethod + def _import_from_c_device(in_ptr, schema): + """ + Import RecordBatch from a C ArrowDeviceArray struct, given its pointer + and the imported schema. + + Parameters + ---------- + in_ptr: int + The raw pointer to a C ArrowDeviceArray struct. + type: Schema or int + Either a Schema object, or the raw pointer to a C ArrowSchema + struct. + + This is a low-level function intended for expert users. + """ + cdef: + void* c_ptr = _as_c_pointer(in_ptr) + void* c_schema_ptr + shared_ptr[CRecordBatch] c_batch + + c_schema = pyarrow_unwrap_schema(schema) + if c_schema == nullptr: + # Not a Schema object, perhaps a raw ArrowSchema pointer + c_schema_ptr = _as_c_pointer(schema, allow_null=True) + with nogil: + c_batch = GetResultValue(ImportDeviceRecordBatch( + c_ptr, c_schema_ptr)) + else: + with nogil: + c_batch = GetResultValue(ImportDeviceRecordBatch( + c_ptr, c_schema)) + return pyarrow_wrap_batch(c_batch) + def _reconstruct_record_batch(columns, schema): """ diff --git a/python/pyarrow/tests/test_cffi.py b/python/pyarrow/tests/test_cffi.py index 3a0c7b5b7152f..ce50fe6a6f81d 100644 --- a/python/pyarrow/tests/test_cffi.py +++ b/python/pyarrow/tests/test_cffi.py @@ -181,11 +181,10 @@ def test_export_import_field(): pa.Field._import_from_c(ptr_schema) -@needs_cffi -def test_export_import_array(): +def check_export_import_array(array_type, exporter, importer): c_schema = ffi.new("struct ArrowSchema*") ptr_schema = int(ffi.cast("uintptr_t", c_schema)) - c_array = ffi.new("struct ArrowArray*") + c_array = ffi.new(f"struct {array_type}*") ptr_array = int(ffi.cast("uintptr_t", c_array)) gc.collect() # Make sure no Arrow data dangles in a ref cycle @@ -195,11 +194,11 @@ def test_export_import_array(): typ = pa.list_(pa.int32()) arr = pa.array([[1], [2, 42]], type=typ) py_value = arr.to_pylist() - arr._export_to_c(ptr_array) + exporter(arr, ptr_array) assert pa.total_allocated_bytes() > old_allocated # Delete recreate C++ object from exported pointer del arr - arr_new = pa.Array._import_from_c(ptr_array, typ) + arr_new = importer(ptr_array, typ) assert arr_new.to_pylist() == py_value assert arr_new.type == pa.list_(pa.int32()) assert pa.total_allocated_bytes() > old_allocated @@ -207,15 +206,15 @@ def test_export_import_array(): assert pa.total_allocated_bytes() == old_allocated # Now released with assert_array_released: - pa.Array._import_from_c(ptr_array, pa.list_(pa.int32())) + importer(ptr_array, pa.list_(pa.int32())) # Type is exported and imported at the same time arr = pa.array([[1], [2, 42]], type=pa.list_(pa.int32())) py_value = arr.to_pylist() - arr._export_to_c(ptr_array, ptr_schema) + exporter(arr, ptr_array, ptr_schema) # Delete and recreate C++ objects from exported pointers del arr - arr_new = pa.Array._import_from_c(ptr_array, ptr_schema) + arr_new = importer(ptr_array, ptr_schema) assert arr_new.to_pylist() == py_value assert arr_new.type == pa.list_(pa.int32()) assert pa.total_allocated_bytes() > old_allocated @@ -223,7 +222,35 @@ def test_export_import_array(): assert pa.total_allocated_bytes() == old_allocated # Now released with assert_schema_released: - pa.Array._import_from_c(ptr_array, ptr_schema) + importer(ptr_array, ptr_schema) + + +@needs_cffi +def test_export_import_array(): + check_export_import_array( + "ArrowArray", + pa.Array._export_to_c, + pa.Array._import_from_c, + ) + + +@needs_cffi +def test_export_import_device_array(): + check_export_import_array( + "ArrowDeviceArray", + pa.Array._export_to_c_device, + pa.Array._import_from_c_device, + ) + + # verify exported struct + c_array = ffi.new("struct ArrowDeviceArray*") + ptr_array = int(ffi.cast("uintptr_t", c_array)) + arr = pa.array([[1], [2, 42]], type=pa.list_(pa.int32())) + arr._export_to_c_device(ptr_array) + + assert c_array.device_type == 1 # ARROW_DEVICE_CPU 1 + assert c_array.device_id == -1 + assert c_array.array.length == 2 def check_export_import_schema(schema_factory, expected_schema_factory=None): @@ -289,10 +316,10 @@ def test_export_import_schema_float_pointer(): assert schema_new == make_schema() -def check_export_import_batch(batch_factory): +def check_export_import_batch(array_type, exporter, importer, batch_factory): c_schema = ffi.new("struct ArrowSchema*") ptr_schema = int(ffi.cast("uintptr_t", c_schema)) - c_array = ffi.new("struct ArrowArray*") + c_array = ffi.new(f"struct {array_type}*") ptr_array = int(ffi.cast("uintptr_t", c_array)) gc.collect() # Make sure no Arrow data dangles in a ref cycle @@ -302,11 +329,11 @@ def check_export_import_batch(batch_factory): batch = batch_factory() schema = batch.schema py_value = batch.to_pydict() - batch._export_to_c(ptr_array) + exporter(batch, ptr_array) assert pa.total_allocated_bytes() > old_allocated # Delete and recreate C++ object from exported pointer del batch - batch_new = pa.RecordBatch._import_from_c(ptr_array, schema) + batch_new = importer(ptr_array, schema) assert batch_new.to_pydict() == py_value assert batch_new.schema == schema assert pa.total_allocated_bytes() > old_allocated @@ -314,7 +341,7 @@ def check_export_import_batch(batch_factory): assert pa.total_allocated_bytes() == old_allocated # Now released with assert_array_released: - pa.RecordBatch._import_from_c(ptr_array, make_schema()) + importer(ptr_array, make_schema()) # Type is exported and imported at the same time batch = batch_factory() @@ -322,7 +349,7 @@ def check_export_import_batch(batch_factory): batch._export_to_c(ptr_array, ptr_schema) # Delete and recreate C++ objects from exported pointers del batch - batch_new = pa.RecordBatch._import_from_c(ptr_array, ptr_schema) + batch_new = importer(ptr_array, ptr_schema) assert batch_new.to_pydict() == py_value assert batch_new.schema == batch_factory().schema assert pa.total_allocated_bytes() > old_allocated @@ -330,28 +357,57 @@ def check_export_import_batch(batch_factory): assert pa.total_allocated_bytes() == old_allocated # Now released with assert_schema_released: - pa.RecordBatch._import_from_c(ptr_array, ptr_schema) + importer(ptr_array, ptr_schema) # Not a struct type pa.int32()._export_to_c(ptr_schema) batch_factory()._export_to_c(ptr_array) with pytest.raises(ValueError, match="ArrowSchema describes non-struct type"): - pa.RecordBatch._import_from_c(ptr_array, ptr_schema) + importer(ptr_array, ptr_schema) # Now released with assert_schema_released: - pa.RecordBatch._import_from_c(ptr_array, ptr_schema) + importer(ptr_array, ptr_schema) @needs_cffi def test_export_import_batch(): - check_export_import_batch(make_batch) + check_export_import_batch( + "ArrowArray", + pa.RecordBatch._export_to_c, + pa.RecordBatch._import_from_c, + make_batch, + ) @needs_cffi def test_export_import_batch_with_extension(): with registered_extension_type(ParamExtType(1)): - check_export_import_batch(make_extension_batch) + check_export_import_batch( + "ArrowArray", + pa.RecordBatch._export_to_c, + pa.RecordBatch._import_from_c, + make_extension_batch, + ) + + +@needs_cffi +def test_export_import_device_batch(): + check_export_import_batch( + "ArrowDeviceArray", + pa.RecordBatch._export_to_c_device, + pa.RecordBatch._import_from_c_device, + make_batch, + ) + + # verify exported struct + c_array = ffi.new("struct ArrowDeviceArray*") + ptr_array = int(ffi.cast("uintptr_t", c_array)) + batch = make_batch() + batch._export_to_c_device(ptr_array) + assert c_array.device_type == 1 # ARROW_DEVICE_CPU 1 + assert c_array.device_id == -1 + assert c_array.array.length == 2 def _export_import_batch_reader(ptr_stream, reader_factory):