Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-34971: [Format] Add non-CPU version of C Data Interface #34972

Merged
merged 23 commits into from
Jun 6, 2023
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 198 additions & 0 deletions cpp/src/arrow/c/abi.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,30 @@
// specific language governing permissions and limitations
// under the License.

/// \file abi.h Arrow C-Data Interface
///
/// The Arrow C-Data interface defines a very small, stable set
zeroshade marked this conversation as resolved.
Show resolved Hide resolved
/// of C definitions which can be easily copied into any project's
/// source code and vendored to be used for columnar data interchange
/// in the Arrow format. For non-C/C++ languages and runtimes,
/// it should be almost as easy to translate the C definitions into
/// the corresponding C FFI declarations.
///
/// Applications and libraries can therefore work with Arrow memory
/// without necessarily using the Arrow libraries or reinventing
/// the wheel. Developers can choose between tight integration
/// with the Arrow software project or minimal integration with
/// the Arrow format only.

#pragma once

#include <stdint.h>

/// \defgroup Arrow C-Data Interface
zeroshade marked this conversation as resolved.
Show resolved Hide resolved
/// Definitions for the C-Data Interface/C-Stream Interface.
///
/// @{

#ifdef __cplusplus
extern "C" {
#endif
Expand Down Expand Up @@ -65,6 +85,92 @@ struct ArrowArray {

#endif // ARROW_C_DATA_INTERFACE

#ifndef ARROW_C_DEVICE_DATA_INTERFACE
#define ARROW_C_DEVICE_DATA_INTERFACE

/// \defgroup arrow-device-types Device Types
/// These macros are compatible with the dlpack DLDeviceType values,
/// using the same value for each enum as the equivalent kDL<type>
/// from dlpack.h
///
/// To ensure predictability with the ABI we use macros instead of
/// an enum so the storage type is not compiler dependent.
///
/// @{

/// \brief DeviceType for the allocated memory
typedef int32_t ArrowDeviceType;

/// \brief CPU device, same as using ArrowArray directly
#define ARROW_DEVICE_CPU = 1
/// \brief CUDA GPU Device
#define ARROW_DEVICE_CUDA = 2
/// \brief Pinned CUDA CPU memory by cudaMallocHost
#define ARROW_DEVICE_CUDA_HOST = 3
/// \brief OpenCL Device
#define ARROW_DEVICE_OPENCL = 4
/// \brief Vulkan buffer for next-gen graphics
#define ARROW_DEVICE_VULKAN = 7
/// \brief Metal for Apple GPU
#define ARROW_DEVICE_METAL = 8
/// \brief Verilog simulator buffer
#define ARROW_DEVICE_VPI = 9
/// \brief ROCm GPUs for AMD GPUs
#define ARROW_DEVICE_ROCM = 10
/// \brief Pinned ROCm CPU memory allocated by hipMallocHost
#define ARROW_DEVICE_ROCMHOST = 11
zeroshade marked this conversation as resolved.
Show resolved Hide resolved
/// \brief Reserved for extension
///
/// used to quickly test extension devices, semantics
/// can differ based on the implementation
#define ARROW_DEVICE_EXT_DEV = 12
/// \brief CUDA managed/unified memory allocated by cudaMallocManaged
#define ARROW_DEVICE_CUDA_MANAGED = 13
/// \brief unified shared memory allocated on a oneAPI
/// non-partitioned device.
///
/// A call to the oneAPI runtime is required to determine the device
/// type, the USM allocation type, and the sycl context it is bound to.
#define ARROW_DEVICE_ONEAPI = 14
/// \brief GPU support for next-gen WebGPU standard
#define ARROW_DEVICE_WEBGPU = 15
/// \brief Qualcomm Hexagon DSP
#define ARROW_DEVICE_HEXAGON = 16

/// @}

/// \brief Struct for passing an Arrow Array alongside
/// device memory information.
struct ArrowDeviceArray {
/// \brief the Allocated Array
zeroshade marked this conversation as resolved.
Show resolved Hide resolved
///
/// the buffers in the array (along with the buffers of any
/// children) are what is allocated on the device.
///
/// the private_data and release callback of the arrow array
/// should contain any necessary information and structures
/// related to freeing the array according to the device it
/// is allocated on, rather than having a separate release
/// callback embedded here.
struct ArrowArray array;
/// \brief The device id to identify a specific device
/// if multiple of this type are on the system.
///
/// the semantics of the id will be hardware dependant.
zeroshade marked this conversation as resolved.
Show resolved Hide resolved
int64_t device_id;
/// \brief The type of device which can access this memory.
ArrowDeviceType device_type;
/// \brief Reserved bytes for future expansion.
///
/// As non-CPU development expands we can update,
/// without ABI breaking changes. These bytes should
/// be zero'd out after allocation in order to ensure
/// safe evolution of the ABI in the future.
int64_t reserved[2];
};

#endif // ARROW_C_DEVICE_DATA_INTERFACE

#ifndef ARROW_C_STREAM_INTERFACE
#define ARROW_C_STREAM_INTERFACE

Expand Down Expand Up @@ -106,6 +212,98 @@ struct ArrowArrayStream {

#endif // ARROW_C_STREAM_INTERFACE

#ifndef ARROW_C_DEVICE_STREAM_INTERFACE
#define ARROW_C_DEVICE_STREAM_INTERFACE

/// \brief Equivalent to ArrowArrayStream, but for ArrowDeviceArrays.
zeroshade marked this conversation as resolved.
Show resolved Hide resolved
///
/// This stream is intended to provide a stream of data on a single
/// device, if a producer wants data to be produced on multiple devices
/// then multiple streams should be provided. One per device.
struct ArrowDeviceArrayStream {
/// \brief The device that this stream produces data on.
///
/// All ArrowDeviceArrays that are produced by this
/// stream should have the same device_type as set
/// here. The device_type needs to be provided here
/// so that consumers can provide the correct type
/// of queue_ptr when calling get_next.
zeroshade marked this conversation as resolved.
Show resolved Hide resolved
ArrowDeviceType device_type;

/// \brief Callback to get the stream schema
/// (will be the same for all arrays in the stream).
///
/// If successful, the ArrowSchema must be released independantly from the stream.
zeroshade marked this conversation as resolved.
Show resolved Hide resolved
/// The schema should be accessible via CPU memory.
///
/// \param[in] self The ArrowDeviceArrayStream object itself
/// \param[out] out C struct to export the schema to
/// \return 0 if successful, an `errno`-compatible error code otherwise.
int (*get_schema)(struct ArrowDeviceArrayStream* self, struct ArrowSchema* out);

/// \brief Callback to get the device id for the next array.
///
/// This is necessary so that the proper/correct stream pointer can be provided
/// to get_next.
///
/// The next call to `get_next` should provide an ArrowDeviceArray whose
/// device_id matches what is provided here, and whose device_type is the
/// same as the device_type member of this stream.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not certain I follow. Isn't the ArrowDeviceArray passed to get_next an "out" parameter? Are you saying that the ArrowDeviceArray struct itself (not the buffers) needs to be allocated on the device?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, I was referring to the device_id member and device_type member that get populated in the ArrowDeviceArray that is returned from get_next

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a weird API choice, all because you want the consumer to pass its CUDA stream of choice...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ultimately this is a consequence of the fact that the existing frameworks and APIs don't provide any good way to manage the stream's lifetime easily which makes having the consumer pass the stream be the safest route to take.

I'm absolutely open to suggestions to make this better as long as the consumer is able to pass in the desired stream.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the existing frameworks and APIs don't provide any good way to manage the stream's lifetime easily

What do you mean by that? Would you care to give a more concrete example? For example CUDA allows you to destroy a stream:
https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__STREAM.html#group__CUDART__STREAM_1gfda584f1788ca983cb21c5f4d2033a62

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a lot of discussion in that issue related to internal stream handling under contexts and schedulers and similar terms. It all boils down to the same discussion of many producers being unable to release or share ownership of their streams.

I think this comment does a good job of summarizing the options that were considered: dmlc/dlpack#57 (comment)

And then this comment summarizes discussion of those options: dmlc/dlpack#57 (comment)

The lifetime management of streams as defined by the Numba documentation for __cuda_array_interface__ (https://numba.readthedocs.io/en/stable/cuda/cuda_array_interface.html#streams) requires that keeping the object (typically array) that produces the __cuda_array_interface__ alive also keeps the stream alive. In most cases libraries don't associate a stream with the object since it's valid to use multiple streams with a single object.

Here's the current state of things across a handful of projects:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @rgommers @leofang @tqchen (please feel free to include anyone else) do you happen to know if there was any other discussion captured that could be linked here regarding the decision to have a consumer hand a stream to the producer?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks the pointers.

I think this comment does a good job of summarizing the options that were considered: dmlc/dlpack#57 (comment)

Yes, I read this. It looks like solution S1, which is also the one I'm proposing, is considered the most flexible (I don't understand the "harder for compilers" comment, though).

And then this comment summarizes discussion of those options: dmlc/dlpack#57 (comment)

I read this too, but it doesn't actually mention S1, for reasons I wasn't able to understand.

In most cases libraries don't associate a stream with the object since it's valid to use multiple streams with a single object.

But you have to actually synchronize on the right stream before being able to use the object, right? How does the user know which stream to synchronize on, if they didn't produce the data themselves?

Copy link
Contributor

@kkraus14 kkraus14 Apr 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I read this. It looks like solution S1, which is also the one I'm proposing, is considered the most flexible (I don't understand the "harder for compilers" comment, though).

From: dmlc/dlpack#57 (comment)

It also brings extra burden to the compilers themselves. The compiler will need to generate optional synchronization code based on the streams, which is non-trivial.

I believe the compilers being referred to here are deep learning compilers like XLA which do things like kernel fusion and set up execution graphs of kernels that use streams internally to parallelize the execution of said graphs.

But you have to actually synchronize on the right stream before being able to use the object, right?

Something / someone needs to guarantee that there isn't a data race with regards to using multiple non-blocking streams, yes. That could be done with events, stream synchronization, or device synchronization.

How does the user know which stream to synchronize on, if they didn't produce the data themselves?

If you're staying within your framework / library then the expectation is for the framework / library to handle things for the user. If crossing framework / library boundaries, then the expectation is to be reliant on things like interchange protocols to handle the synchronization semantics.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you happen to know if there was any other discussion captured that could be linked here regarding the decision to have a consumer hand a stream to the producer

Sorry I wasn't able to respond promptly. Is the question still open?

In the case of CAI, it is required that someone handles the exporting stream's lifetime properly:

Like data, CUDA streams also have a finite lifetime. It is therefore required that a Producer exporting data on the interface with an associated stream ensures that the exported stream’s lifetime is equal to or surpasses the lifetime of the object from which the interface was exported.

and this was considered a burden when discussing the DLPack support. A few libraries like Numba, for example, had to hold the reference to the underlying stream. I believe this was the main concern for DLPack to place the requirement on the consumer instead of the producer.

///
/// \param[in] self The ArrowDeviceArrayStream object itself
/// \param[out] out_device_id Pointer to be populated with the device id, must not be
/// null \return 0 if successful, an `errno`-compatible error code otherwise.
int (*get_next_device_id)(struct ArrowDeviceArrayStream* self, int64_t* out_device_id);

/// \brief Callback to get the next array
///
/// If there is no error and the returned array has been released, the stream
/// has ended. If successful, the ArrowArray must be released independently
/// from the stream.
///
/// Because different frameworks use different types to represent this, we
/// accept a void* which should then be reinterpreted into whatever the
/// appropriate type is (e.g. cudaStream_t) for use by the producer.
zeroshade marked this conversation as resolved.
Show resolved Hide resolved
///
/// \param[in] self The ArrowDeviceArrayStream object itself
/// \param[in] queue_ptr The appropriate queue, stream, or
/// equivalent object for the device that the data is allocated on
/// to indicate where the consumer wants the data to be accessible.
/// If queue_ptr is NULL then the default stream (e.g. CUDA stream 0)
/// should be used to ensure that the memory is accessible from any stream.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused here. It sounds like I need to call get_next_device_id to determine which queue to use and then I need to pass that queue on to the call to get_next. But why? Why isn't the producer managing the queues?

If the producer controls which device id gets used (get_next_device_id seems to suggest this) then why does the consumer need to give it the queue? For example, if I were merging streams from two different devices it seems like I would do something like (apologies for the butchered pseudo-code)...

// Dummy class merging two infinite streams in an inefficient round-robin fashion
class MergedStream {

  int get_next(ArrowDeviceArray* out) {
    if (merged_arrays_.empty()) {
      ArrowDeviceArray arr;
      left_.get_next(&arr);
      merged_arrays_.push(arr);
      right_.get_next(&arr);
      merged_arrays_.push(arr);
    }
    *out = merged_arrays_.pop();
  }
};

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@westonpace The idea here is that the consumer of the interface provides a queue to the producer and the producer is responsible for ensuring that the data is safe to consume on the provided queue.

The reason for doing this instead of the producer returning a pointer to a queue that the data is safe to consume on is that frameworks generally manage these queues internally and don't have a mechanism to share a queue and control its lifetime over a boundary like this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The standard "mechanism to share a queue and control its lifetime over a boundary like this" in the C Data Interface would be the release callback.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pitrou the issue is that there isn't a mechanism that you could call in the release callback (to my knowledge) to cleanly control the lifetime. (@kkraus14 correct me if I'm wrong and this isn't what you meant)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That doesn't make sense, does it? How is the consumer supposed to manage the stream's lifetime if "there isn't a mechanism that you could call in the release callback to cleanly control the lifetime"?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why wouldn't they? They can easily refcount the usage of their own CUDA streams.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why wouldn't they? They can easily refcount the usage of their own CUDA streams.

I think that is making a lot of assumptions about how folks use and manage CUDA streams 😄. Again, some places use them similarly to thread pools and only control the lifetime of the pool.

I tried to dig through Tensorflow's code to figure exactly how they're managing the lifetime of their streams but I'm not confident, everything I say below may not be correct:

I guess in theory that if they ultimately have Stream objects being used that it could be moved into the private data being used by the release callback.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to dig through Tensorflow's code to figure exactly how they're managing the lifetime of their streams but I'm not confident

The fact that they're handling those lifetimes should be enough to plug a refcounting mechanism (or interact with the GC, in case of a managed language). This is already necessary to manage the lifetime of data exported through the C Data Interface.

I understand that they might not have a refcounting mechanism in place already, but that's basic engineering anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, regardless if we take the producer provided path then I think it makes a lot more sense for the producer to share an Event than a Stream.

An Event can be waited on via cudaStreamWaitEvent / hipStreamWaitEvent which does a device side wait which would have minimal overhead if it's the same stream or cudaEventSynchronize / hipEventSynchronize if blocking host code is desired.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it seems we're going to take the Producer providing an event path, there isn't really a need for the get_next_device_id callback anymore, correct? Or am I missing something?

/// \param[out] out C struct where to export the Array and device info
/// \return 0 if successful, an `errno`-compatible error code otherwise.
int (*get_next)(struct ArrowDeviceArrayStream* self, const void* queue_ptr,
struct ArrowDeviceArray* out);

/// \brief Callback to get optional detailed error information.
///
/// This must only be called if the last stream operation failed
/// with a non-0 return code.
///
/// The returned pointer is only valid until the next operation on this stream
/// (including release).
///
/// \param[in] self The ArrowDeviceArrayStream object itself
/// \return pointer to a null-terminated character array describing
/// the last error, or NULL if no description is available.
const char* (*get_last_error)(struct ArrowDeviceArrayStream* self);

/// \brief Release callback: release the stream's own resources.
///
/// Note that arrays returned by `get_next` must be individually released.
///
/// \param[in] self The ArrowDeviceArrayStream object itself
void (*release)(struct ArrowDeviceArrayStream* self);

/// \brief Opaque producer-specific data
void* private_data;
};

#endif // ARROW_C_DEVICE_STREAM_INTERFACE

#ifdef __cplusplus
}
#endif

/// @}