Skip to content

Commit

Permalink
GH-40060: [C++][Python] Basic conversion of RecordBatch to Arrow Tens…
Browse files Browse the repository at this point in the history
…or - add support for different data types (#40359)

### What changes are included in this PR?

- Added support for `RecordBatches` with fields of different type in the conversion `RecordBatch` → `Tensor`.
- Added detail of the constraints to the `RecordBatch.to_tensor()` docstrings, see #40064 (comment).

### Are these changes tested?

Yes.

### Are there any user-facing changes?

No.
* GitHub Issue: #40060

Lead-authored-by: AlenkaF <frim.alenka@gmail.com>
Co-authored-by: Alenka Frim <AlenkaF@users.noreply.github.com>
Co-authored-by: Benjamin Kietzman <bengilgit@gmail.com>
Co-authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
  • Loading branch information
3 people authored Mar 26, 2024
1 parent 32437a5 commit 434f872
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 51 deletions.
91 changes: 70 additions & 21 deletions cpp/src/arrow/record_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
#include "arrow/type.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/unreachable.h"
#include "arrow/util/vector.h"
#include "arrow/visit_type_inline.h"

namespace arrow {

Expand Down Expand Up @@ -248,19 +250,40 @@ Result<std::shared_ptr<StructArray>> RecordBatch::ToStructArray() const {
/*offset=*/0);
}

template <typename Out>
struct ConvertColumnsToTensorVisitor {
Out*& out_values;
const ArrayData& in_data;

template <typename T>
Status Visit(const T&) {
if constexpr (is_numeric(T::type_id)) {
using In = typename T::c_type;
auto in_values = ArraySpan(in_data).GetSpan<In>(1, in_data.length);

if constexpr (std::is_same_v<In, Out>) {
memcpy(out_values, in_values.data(), in_values.size_bytes());
out_values += in_values.size();
} else {
for (In in_value : in_values) {
*out_values++ = static_cast<Out>(in_value);
}
}
return Status::OK();
}
Unreachable();
}
};

template <typename DataType>
inline void ConvertColumnsToTensor(const RecordBatch& batch, uint8_t* out) {
using CType = typename arrow::TypeTraits<DataType>::CType;
auto* out_values = reinterpret_cast<CType*>(out);

// Loop through all of the columns
for (int i = 0; i < batch.num_columns(); ++i) {
const auto* in_values = batch.column(i)->data()->GetValues<CType>(1);

// Copy data of each column
memcpy(out_values, in_values, sizeof(CType) * batch.num_rows());
out_values += batch.num_rows();
} // End loop through columns
for (const auto& column : batch.columns()) {
ConvertColumnsToTensorVisitor<CType> visitor{out_values, *column->data()};
DCHECK_OK(VisitTypeInline(*column->type(), &visitor));
}
}

Result<std::shared_ptr<Tensor>> RecordBatch::ToTensor(MemoryPool* pool) const {
Expand All @@ -269,28 +292,54 @@ Result<std::shared_ptr<Tensor>> RecordBatch::ToTensor(MemoryPool* pool) const {
"Conversion to Tensor for RecordBatches without columns/schema is not "
"supported.");
}
const auto& type = column(0)->type();
// Check for supported data types
if (!is_integer(type->id()) && !is_floating(type->id())) {
return Status::TypeError("DataType is not supported: ", type->ToString());
}
// Check for uniform data type
// Check for no validity bitmap of each field
for (int i = 0; i < num_columns(); ++i) {
if (column(i)->null_count() > 0) {
return Status::TypeError("Can only convert a RecordBatch with no nulls.");
}
if (column(i)->type() != type) {
return Status::TypeError("Can only convert a RecordBatch with uniform data type.");
}

// Check for supported data types and merge fields
// to get the resulting uniform data type
if (!is_integer(column(0)->type()->id()) && !is_floating(column(0)->type()->id())) {
return Status::TypeError("DataType is not supported: ",
column(0)->type()->ToString());
}
std::shared_ptr<Field> result_field = schema_->field(0);
std::shared_ptr<DataType> result_type = result_field->type();

if (num_columns() > 1) {
Field::MergeOptions options;
options.promote_integer_to_float = true;
options.promote_integer_sign = true;
options.promote_numeric_width = true;

for (int i = 1; i < num_columns(); ++i) {
if (!is_numeric(column(i)->type()->id())) {
return Status::TypeError("DataType is not supported: ",
column(i)->type()->ToString());
}

// Casting of float16 is not supported, throw an error in this case
if ((column(i)->type()->id() == Type::HALF_FLOAT ||
result_field->type()->id() == Type::HALF_FLOAT) &&
column(i)->type()->id() != result_field->type()->id()) {
return Status::NotImplemented("Casting from or to halffloat is not supported.");
}

ARROW_ASSIGN_OR_RAISE(
result_field, result_field->MergeWith(
schema_->field(i)->WithName(result_field->name()), options));
}
result_type = result_field->type();
}

// Allocate memory
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<Buffer> result,
AllocateBuffer(type->bit_width() * num_columns() * num_rows(), pool));
AllocateBuffer(result_type->bit_width() * num_columns() * num_rows(), pool));
// Copy data
switch (type->id()) {
switch (result_type->id()) {
case Type::UINT8:
ConvertColumnsToTensor<UInt8Type>(*this, result->mutable_data());
break;
Expand Down Expand Up @@ -323,18 +372,18 @@ Result<std::shared_ptr<Tensor>> RecordBatch::ToTensor(MemoryPool* pool) const {
ConvertColumnsToTensor<DoubleType>(*this, result->mutable_data());
break;
default:
return Status::TypeError("DataType is not supported: ", type->ToString());
return Status::TypeError("DataType is not supported: ", result_type->ToString());
}

// Construct Tensor object
const auto& fixed_width_type =
internal::checked_cast<const FixedWidthType&>(*column(0)->type());
internal::checked_cast<const FixedWidthType&>(*result_type);
std::vector<int64_t> shape = {num_rows(), num_columns()};
std::vector<int64_t> strides;
ARROW_RETURN_NOT_OK(
internal::ComputeColumnMajorStrides(fixed_width_type, shape, &strides));
ARROW_ASSIGN_OR_RAISE(auto tensor,
Tensor::Make(type, std::move(result), shape, strides));
Tensor::Make(result_type, std::move(result), shape, strides));

return tensor;
}
Expand Down
128 changes: 115 additions & 13 deletions cpp/src/arrow/record_batch_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -619,37 +619,37 @@ TEST_F(TestRecordBatch, ConcatenateRecordBatches) {
ASSERT_BATCHES_EQUAL(*batch, *null_batch);
}

TEST_F(TestRecordBatch, ToTensorUnsupported) {
TEST_F(TestRecordBatch, ToTensorUnsupportedType) {
const int length = 9;

// Mixed data type
auto f0 = field("f0", int32());
auto f1 = field("f1", int64());
// Unsupported data type
auto f1 = field("f1", utf8());

std::vector<std::shared_ptr<Field>> fields = {f0, f1};
auto schema = ::arrow::schema(fields);

auto a0 = ArrayFromJSON(int32(), "[1, 2, 3, 4, 5, 6, 7, 8, 9]");
auto a1 = ArrayFromJSON(int64(), "[10, 20, 30, 40, 50, 60, 70, 80, 90]");
auto a1 = ArrayFromJSON(utf8(), R"(["a", "b", "c", "a", "b", "c", "a", "b", "c"])");

auto batch = RecordBatch::Make(schema, length, {a0, a1});

ASSERT_RAISES_WITH_MESSAGE(
TypeError, "Type error: Can only convert a RecordBatch with uniform data type.",
TypeError, "Type error: DataType is not supported: " + a1->type()->ToString(),
batch->ToTensor());

// Unsupported data type
auto f2 = field("f2", utf8());

std::vector<std::shared_ptr<Field>> fields_1 = {f2};
auto schema_2 = ::arrow::schema(fields_1);
// Unsupported boolean data type
auto f2 = field("f2", boolean());

auto a2 = ArrayFromJSON(utf8(), R"(["a", "b", "c", "a", "b", "c", "a", "b", "c"])");
auto batch_2 = RecordBatch::Make(schema_2, length, {a2});
std::vector<std::shared_ptr<Field>> fields2 = {f0, f2};
auto schema2 = ::arrow::schema(fields2);
auto a2 = ArrayFromJSON(boolean(),
"[true, false, true, true, false, true, false, true, true]");
auto batch2 = RecordBatch::Make(schema2, length, {a0, a2});

ASSERT_RAISES_WITH_MESSAGE(
TypeError, "Type error: DataType is not supported: " + a2->type()->ToString(),
batch_2->ToTensor());
batch2->ToTensor());
}

TEST_F(TestRecordBatch, ToTensorUnsupportedMissing) {
Expand Down Expand Up @@ -740,6 +740,108 @@ TEST_F(TestRecordBatch, ToTensorSupportedNaN) {
CheckTensor<FloatType>(tensor, 18, shape, f_strides);
}

TEST_F(TestRecordBatch, ToTensorSupportedTypesMixed) {
const int length = 9;

auto f0 = field("f0", uint16());
auto f1 = field("f1", int16());
auto f2 = field("f2", float32());

auto a0 = ArrayFromJSON(uint16(), "[1, 2, 3, 4, 5, 6, 7, 8, 9]");
auto a1 = ArrayFromJSON(int16(), "[10, 20, 30, 40, 50, 60, 70, 80, 90]");
auto a2 = ArrayFromJSON(float32(), "[100, 200, 300, NaN, 500, 600, 700, 800, 900]");

// Single column
std::vector<std::shared_ptr<Field>> fields = {f0};
auto schema = ::arrow::schema(fields);
auto batch = RecordBatch::Make(schema, length, {a0});

ASSERT_OK_AND_ASSIGN(auto tensor, batch->ToTensor());
ASSERT_OK(tensor->Validate());

std::vector<int64_t> shape = {9, 1};
const int64_t uint16_size = sizeof(uint16_t);
std::vector<int64_t> f_strides = {uint16_size, uint16_size * shape[0]};
std::shared_ptr<Tensor> tensor_expected =
TensorFromJSON(uint16(), "[1, 2, 3, 4, 5, 6, 7, 8, 9]", shape, f_strides);

EXPECT_TRUE(tensor_expected->Equals(*tensor));
CheckTensor<UInt16Type>(tensor, 9, shape, f_strides);

// uint16 + int16 = int32
std::vector<std::shared_ptr<Field>> fields1 = {f0, f1};
auto schema1 = ::arrow::schema(fields1);
auto batch1 = RecordBatch::Make(schema1, length, {a0, a1});

ASSERT_OK_AND_ASSIGN(auto tensor1, batch1->ToTensor());
ASSERT_OK(tensor1->Validate());

std::vector<int64_t> shape1 = {9, 2};
const int64_t int32_size = sizeof(int32_t);
std::vector<int64_t> f_strides_1 = {int32_size, int32_size * shape1[0]};
std::shared_ptr<Tensor> tensor_expected_1 = TensorFromJSON(
int32(), "[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, 50, 60, 70, 80, 90]",
shape1, f_strides_1);

EXPECT_TRUE(tensor_expected_1->Equals(*tensor1));

CheckTensor<Int32Type>(tensor1, 18, shape1, f_strides_1);

ASSERT_EQ(tensor1->type()->bit_width(), tensor_expected_1->type()->bit_width());

ASSERT_EQ(1, tensor_expected_1->Value<Int32Type>({0, 0}));
ASSERT_EQ(2, tensor_expected_1->Value<Int32Type>({1, 0}));
ASSERT_EQ(10, tensor_expected_1->Value<Int32Type>({0, 1}));

// uint16 + int16 + float32 = float64
std::vector<std::shared_ptr<Field>> fields2 = {f0, f1, f2};
auto schema2 = ::arrow::schema(fields2);
auto batch2 = RecordBatch::Make(schema2, length, {a0, a1, a2});

ASSERT_OK_AND_ASSIGN(auto tensor2, batch2->ToTensor());
ASSERT_OK(tensor2->Validate());

std::vector<int64_t> shape2 = {9, 3};
const int64_t f64_size = sizeof(double);
std::vector<int64_t> f_strides_2 = {f64_size, f64_size * shape2[0]};
std::shared_ptr<Tensor> tensor_expected_2 =
TensorFromJSON(float64(),
"[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, 50, "
"60, 70, 80, 90, 100, 200, 300, NaN, 500, 600, 700, 800, 900]",
shape2, f_strides_2);

EXPECT_FALSE(tensor_expected_2->Equals(*tensor2));
EXPECT_TRUE(tensor_expected_2->Equals(*tensor2, EqualOptions().nans_equal(true)));

CheckTensor<DoubleType>(tensor2, 27, shape2, f_strides_2);
}

TEST_F(TestRecordBatch, ToTensorUnsupportedMixedFloat16) {
const int length = 9;

auto f0 = field("f0", float16());
auto f1 = field("f1", float64());

auto a0 = ArrayFromJSON(float16(), "[1, 2, 3, 4, 5, 6, 7, 8, 9]");
auto a1 = ArrayFromJSON(float64(), "[10, 20, 30, 40, 50, 60, 70, 80, 90]");

std::vector<std::shared_ptr<Field>> fields = {f0, f1};
auto schema = ::arrow::schema(fields);
auto batch = RecordBatch::Make(schema, length, {a0, a1});

ASSERT_RAISES_WITH_MESSAGE(
NotImplemented, "NotImplemented: Casting from or to halffloat is not supported.",
batch->ToTensor());

std::vector<std::shared_ptr<Field>> fields1 = {f1, f0};
auto schema1 = ::arrow::schema(fields1);
auto batch1 = RecordBatch::Make(schema1, length, {a1, a0});

ASSERT_RAISES_WITH_MESSAGE(
NotImplemented, "NotImplemented: Casting from or to halffloat is not supported.",
batch1->ToTensor());
}

template <typename DataType>
class TestBatchToTensor : public ::testing::Test {};

Expand Down
3 changes: 3 additions & 0 deletions python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -3392,6 +3392,9 @@ cdef class RecordBatch(_Tabular):
def to_tensor(self):
"""
Convert to a :class:`~pyarrow.Tensor`.
RecordBatches that can be converted have fields of type signed or unsigned
integer or float, including all bit-widths, with no validity bitmask.
"""
cdef:
shared_ptr[CRecordBatch] c_record_batch
Expand Down
Loading

0 comments on commit 434f872

Please sign in to comment.