From 0707062979e99e136656bf7ebfda7f4eb722b5ce Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Wed, 20 Jul 2016 18:29:29 -0700 Subject: [PATCH] implementation of string, union, struct IPC --- cpp/CMakeLists.txt | 3 +- cpp/src/arrow/builder.h | 9 +++++ cpp/src/arrow/ipc/adapter.cc | 55 ++++++++++++++++++++++++-- cpp/src/arrow/ipc/ipc-metadata-test.cc | 11 ++++++ cpp/src/arrow/ipc/metadata-internal.cc | 41 ++++++++++++++++++- cpp/src/arrow/types/collection.h | 13 ++++++ cpp/src/arrow/types/list.h | 6 ++- cpp/src/arrow/types/primitive.h | 9 +++++ cpp/src/arrow/types/union.cc | 35 ++++++++++++++++ cpp/src/arrow/types/union.h | 37 ++++++++++++++++- format/Layout.md | 6 +-- format/Message.fbs | 6 ++- 12 files changed, 217 insertions(+), 14 deletions(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 18b47599b93d0..f617bc4fe1e4b 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -660,7 +660,8 @@ set(ARROW_SRCS src/arrow/util/status.cc ) -set(LIBARROW_LINKAGE "SHARED") +set(LIBARROW_LINKAGE "SHARED" CACHE STRING + "Arrow can be built as a 'STATIC' or 'SHARED' library") add_library(arrow ${LIBARROW_LINKAGE} diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h index 7d3f4398d73e3..e6a41df6b185a 100644 --- a/cpp/src/arrow/builder.h +++ b/cpp/src/arrow/builder.h @@ -129,6 +129,15 @@ class ArrayBuilder { DISALLOW_COPY_AND_ASSIGN(ArrayBuilder); }; +class NullArrayBuilder : public ArrayBuilder { +public: + explicit NullArrayBuilder(MemoryPool* pool, const TypePtr& type) : ArrayBuilder(pool, type) {} + virtual ~NullArrayBuilder() {}; + std::shared_ptr Finish() override { + return nullptr; + } +}; + } // namespace arrow #endif // ARROW_BUILDER_H_ diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc index 45cc288cd6b9e..8133bcf2d8f3f 100644 --- a/cpp/src/arrow/ipc/adapter.cc +++ b/cpp/src/arrow/ipc/adapter.cc @@ -33,6 +33,9 @@ #include "arrow/types/construct.h" #include "arrow/types/list.h" #include "arrow/types/primitive.h" +#include "arrow/types/struct.h" +#include "arrow/types/union.h" +#include "arrow/types/string.h" #include "arrow/util/buffer.h" #include "arrow/util/logging.h" #include "arrow/util/status.h" @@ -87,7 +90,7 @@ static bool IsListType(const DataType* type) { // case Type::CHAR: case Type::LIST: // see todo on common types - // case Type::STRING: + case Type::STRING: // case Type::VARCHAR: return true; default: @@ -116,14 +119,24 @@ Status VisitArray(const Array* arr, std::vector* field_nodes if (IsPrimitive(arr_type)) { const auto prim_arr = static_cast(arr); buffers->push_back(prim_arr->data()); - } else if (IsListType(arr_type)) { + } else if (IsListType(arr_type) || arr->type()->type == Type::STRING) { const auto list_arr = static_cast(arr); buffers->push_back(list_arr->offset_buffer()); RETURN_NOT_OK(VisitArray( list_arr->values().get(), field_nodes, buffers, max_recursion_depth - 1)); } else if (arr->type_enum() == Type::STRUCT) { // TODO(wesm) - return Status::NotImplemented("Struct type"); + const auto struct_arr = static_cast(arr); + for (auto& field_arr : struct_arr->fields()) { + RETURN_NOT_OK(VisitArray(field_arr.get(), field_nodes, buffers, max_recursion_depth - 1)); + } + } else if (arr->type_enum() == Type::DENSE_UNION) { + const auto union_arr = static_cast(arr); + buffers->push_back(union_arr->types()); + buffers->push_back(union_arr->offset_buf()); + for (auto& child_arr : union_arr->children()) { + RETURN_NOT_OK(VisitArray(child_arr.get(), field_nodes, buffers, max_recursion_depth - 1)); + } } else { return Status::NotImplemented("Unrecognized type"); } @@ -158,7 +171,7 @@ class RowBatchWriter { // requirements of buffers but capacity always should. size = buffer->capacity(); // check that padding is appropriate - RETURN_NOT_OK(CheckMultipleOf64(size)); + RETURN_NOT_OK(CheckMultipleOf64(size)); // TODO(pcm): put this in again } // TODO(wesm): We currently have no notion of shared memory page id's, // but we've included it in the metadata IDL for when we have it in the @@ -316,6 +329,40 @@ class RowBatchReader::Impl { NextArray(type->child(0).get(), max_recursion_depth - 1, &values_array)); return MakeListArray(type, field_meta.length, offsets, values_array, field_meta.null_count, null_bitmap, out); + } else if (type->type == Type::STRUCT) { + int32_t length = 0; + const int num_children = type->num_children(); + std::vector results; + for (int i = 0; i < num_children; ++i) { + std::shared_ptr result; + RETURN_NOT_OK(NextArray(type->child(i).get(), max_recursion_depth - 1, &result)); + DCHECK(length == 0 || length == result->length()); + length = result->length(); + results.push_back(result); + } + *out = std::make_shared(type, length, results); + return Status::OK(); + } else if (type->type == Type::DENSE_UNION) { + std::shared_ptr types; + RETURN_NOT_OK(GetBuffer(buffer_index_++, &types)); + std::shared_ptr offset_buf; + RETURN_NOT_OK(GetBuffer(buffer_index_++, &offset_buf)); + auto type2 = std::dynamic_pointer_cast(type); + if (!type2) { + return Status::Invalid("unexpected error"); + } + const int num_children = type2->num_children(); + + std::vector results; + for (int i = 0; i < num_children; ++i) { + std::shared_ptr result; + auto f = std::make_shared(std::string(""), type2->child(i), false); + RETURN_NOT_OK(NextArray(f.get(), max_recursion_depth - 1, &result)); + results.push_back(result); + } + *out = std::make_shared(type, field_meta.length, results, types, offset_buf, + field_meta.null_count, null_bitmap); + return Status::OK(); } return Status::NotImplemented("Non-primitive types not complete yet"); } diff --git a/cpp/src/arrow/ipc/ipc-metadata-test.cc b/cpp/src/arrow/ipc/ipc-metadata-test.cc index 51d79cfb4c4bb..fa5a87bae0971 100644 --- a/cpp/src/arrow/ipc/ipc-metadata-test.cc +++ b/cpp/src/arrow/ipc/ipc-metadata-test.cc @@ -21,10 +21,12 @@ #include "gtest/gtest.h" +#include "arrow/api.h" #include "arrow/ipc/metadata.h" #include "arrow/schema.h" #include "arrow/test-util.h" #include "arrow/type.h" +#include "arrow/types/union.h" #include "arrow/util/status.h" namespace arrow { @@ -94,4 +96,13 @@ TEST_F(TestSchemaMessage, NestedFields) { CheckRoundtrip(&schema); } +TEST_F(TestSchemaMessage, DenseUnion) { + auto t0 = TypePtr(new Int32Type()); + auto t1 = TypePtr(new Int64Type()); + auto u = TypePtr(new DenseUnionType(std::vector({t0, t1}))); + auto f = std::make_shared("f", u); + Schema schema({f}); + CheckRoundtrip(&schema); +} + } // namespace arrow diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc index 1b1d50f96eaf5..42b93b5ccf168 100644 --- a/cpp/src/arrow/ipc/metadata-internal.cc +++ b/cpp/src/arrow/ipc/metadata-internal.cc @@ -28,6 +28,7 @@ #include "arrow/ipc/Message_generated.h" #include "arrow/schema.h" #include "arrow/type.h" +#include "arrow/types/union.h" #include "arrow/util/buffer.h" #include "arrow/util/status.h" @@ -118,11 +119,20 @@ static Status TypeFromFlatbuffer(flatbuf::Type type, const void* type_data, } *out = std::make_shared(children[0]); return Status::OK(); + case flatbuf::Type_Str: + *out = std::make_shared(); + return Status::OK(); case flatbuf::Type_Tuple: *out = std::make_shared(children); return Status::OK(); - case flatbuf::Type_Union: - return Status::NotImplemented("Type is not implemented"); + case flatbuf::Type_Union: { + std::vector child_types; + for (auto type : children) { + child_types.push_back(type->type); + } + *out = std::make_shared(child_types); // TODO(pcm): SparseUnionType + return Status::OK(); + } default: return Status::Invalid("Unrecognized type"); } @@ -132,6 +142,10 @@ static Status TypeFromFlatbuffer(flatbuf::Type type, const void* type_data, static Status FieldToFlatbuffer( FBB& fbb, const std::shared_ptr& field, FieldOffset* offset); +// Forward declaration +static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr& type, + std::vector* children, flatbuf::Type* out_type, Offset* offset); + static Offset IntToFlatbuffer(FBB& fbb, int bitWidth, bool is_signed) { return flatbuf::CreateInt(fbb, bitWidth, is_signed).Union(); } @@ -160,6 +174,22 @@ static Status StructToFlatbuffer(FBB& fbb, const std::shared_ptr& type return Status::OK(); } +static Status UnionToFlatbuffer(FBB& fbb, const std::shared_ptr& type, + std::vector* out_children, Offset* offset) { + auto type2 = std::dynamic_pointer_cast(type); + if (!type2) { + return Status::Invalid("unexpected error"); + } + FieldOffset field; + for (int i = 0; i < type2->num_children(); ++i) { + auto f = std::make_shared(std::string(""), type2->child(i), false); + RETURN_NOT_OK(FieldToFlatbuffer(fbb, f, &field)); + out_children->push_back(field); + } + *offset = flatbuf::CreateUnion(fbb).Union(); + return Status::OK(); +} + #define INT_TO_FB_CASE(BIT_WIDTH, IS_SIGNED) \ *out_type = flatbuf::Type_Int; \ *offset = IntToFlatbuffer(fbb, BIT_WIDTH, IS_SIGNED); \ @@ -199,9 +229,16 @@ static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr& type, case Type::LIST: *out_type = flatbuf::Type_List; return ListToFlatbuffer(fbb, type, children, offset); + case Type::STRING: + *out_type = flatbuf::Type_Str; + *offset = flatbuf::CreateStr(fbb).Union(); + return Status::OK(); case Type::STRUCT: *out_type = flatbuf::Type_Tuple; return StructToFlatbuffer(fbb, type, children, offset); + case Type::DENSE_UNION: + *out_type = flatbuf::Type_Union; + return UnionToFlatbuffer(fbb, type, children, offset); default: *out_type = flatbuf::Type_NONE; // Make clang-tidy happy std::stringstream ss; diff --git a/cpp/src/arrow/types/collection.h b/cpp/src/arrow/types/collection.h index 1712030203fa2..8121cccb6d379 100644 --- a/cpp/src/arrow/types/collection.h +++ b/cpp/src/arrow/types/collection.h @@ -34,6 +34,19 @@ struct CollectionType : public DataType { const TypePtr& child(int i) const { return child_types_[i]; } int num_children() const { return child_types_.size(); } + + bool Equals(const DataType* other) const override { + bool equals = other && ((this == other) || ((this->type == other->type))); + auto second = dynamic_cast*>(other); + equals = equals && other && this->num_children() == second->num_children(); + if (equals) { + for (int i = 0; i < num_children(); ++i) { + // TODO(emkornfield) limit recursion + if (!child(i)->Equals(second->child(i))) { return false; } + } + } + return equals; + } }; } // namespace arrow diff --git a/cpp/src/arrow/types/list.h b/cpp/src/arrow/types/list.h index 2f6f85d66ca60..2c2a10ba627e4 100644 --- a/cpp/src/arrow/types/list.h +++ b/cpp/src/arrow/types/list.h @@ -170,7 +170,11 @@ class ListBuilder : public ArrayBuilder { Status Append(bool is_valid = true) { RETURN_NOT_OK(Reserve(1)); UnsafeAppendToBitmap(is_valid); - RETURN_NOT_OK(offset_builder_.Append(value_builder_->length())); + if (value_builder_) { + RETURN_NOT_OK(offset_builder_.Append(value_builder_->length())); + } else { + RETURN_NOT_OK(offset_builder_.Append(0)); + } return Status::OK(); } diff --git a/cpp/src/arrow/types/primitive.h b/cpp/src/arrow/types/primitive.h index f1ec417d51014..b07d9a6f3fca8 100644 --- a/cpp/src/arrow/types/primitive.h +++ b/cpp/src/arrow/types/primitive.h @@ -109,6 +109,9 @@ class PrimitiveBuilder : public ArrayBuilder { explicit PrimitiveBuilder(MemoryPool* pool, const TypePtr& type) : ArrayBuilder(pool, type), data_(nullptr) {} + explicit PrimitiveBuilder(MemoryPool* pool) + : PrimitiveBuilder(pool, std::make_shared()) {} + virtual ~PrimitiveBuilder() {} using ArrayBuilder::Advance; @@ -128,6 +131,12 @@ class PrimitiveBuilder : public ArrayBuilder { std::shared_ptr data() const { return data_; } + // Number of elements in the builder so far + int32_t num_elems() const { + // return length_ / type_traits::bytes_required(1); + return length_; + } + // Vector append // // If passed, valid_bytes is of equal length to values, and any zero byte diff --git a/cpp/src/arrow/types/union.cc b/cpp/src/arrow/types/union.cc index c891b4a5357ef..89230c5f7a3c4 100644 --- a/cpp/src/arrow/types/union.cc +++ b/cpp/src/arrow/types/union.cc @@ -26,6 +26,41 @@ namespace arrow { +bool DenseUnionArray::Equals(const std::shared_ptr& arr) const { + if (this == arr.get()) { return true; } + if (!arr) { return false; } + if (this->type_enum() != arr->type_enum()) { return false; } + if (null_count_ != arr->null_count()) { return false; } + return RangeEquals(0, length_, 0, arr); +} + +bool DenseUnionArray::RangeEquals(int32_t start_idx, int32_t end_idx, int32_t other_start_idx, + const std::shared_ptr& arr) const { + if (this == arr.get()) { return true; } + if (Type::DENSE_UNION != arr->type_enum()) { return false; } + const auto other = static_cast(arr.get()); + + int32_t i = start_idx; + int32_t o_i = other_start_idx; + for (size_t c = 0; c < other->children().size(); ++c) { + for (int32_t e = 0; e < other->children()[c]->length(); ++e) { + if (!children()[c]->RangeEquals(e, e + 1, e, other->children()[c])) { // FIXME(pcm): fix this + return false; + } + i += 1; + o_i += 1; + if (i >= end_idx) { + return true; + } + } + } + return false; // to make the compiler happy +} + +Status DenseUnionArray::Validate() const { + return Status::OK(); +} + static inline std::string format_union(const std::vector& child_types) { std::stringstream s; s << "union<"; diff --git a/cpp/src/arrow/types/union.h b/cpp/src/arrow/types/union.h index d2ee9bde04d0d..f8db327ce1225 100644 --- a/cpp/src/arrow/types/union.h +++ b/cpp/src/arrow/types/union.h @@ -25,6 +25,7 @@ #include "arrow/array.h" #include "arrow/type.h" #include "arrow/types/collection.h" +#include "arrow/types/primitive.h" namespace arrow { @@ -51,15 +52,47 @@ struct SparseUnionType : public CollectionType { }; class UnionArray : public Array { + public: + UnionArray(const TypePtr& type, int32_t length, std::vector& children, + std::shared_ptr types, + int32_t null_count = 0, std::shared_ptr null_bitmap = nullptr) + : Array(type, length, null_count, null_bitmap), types_(types) { + type_ = type; + children_ = children; + // CHECK_EQ(children.size() * sizeof(int16_t), types.size()); + } + + const std::shared_ptr& types() const { return types_; } + + const std::vector& children() const { return children_; } + + ArrayPtr child(int32_t index) const { return children_[index]; } + protected: // The data are types encoded as int16 - Buffer* types_; + std::shared_ptr types_; std::vector> children_; }; class DenseUnionArray : public UnionArray { + public: + DenseUnionArray(const TypePtr& type, int32_t length, std::vector& children, + std::shared_ptr types, std::shared_ptr offset_buf, + int32_t null_count = 0, std::shared_ptr null_bitmap = nullptr) + : UnionArray(type, length, children, types, null_count, null_bitmap) { + offset_buf_ = offset_buf; + } + + const std::shared_ptr& offset_buf() const { return offset_buf_; } + + Status Validate() const override; + + bool Equals(const std::shared_ptr& arr) const override; + bool RangeEquals(int32_t start_idx, int32_t end_idx, int32_t other_start_idx, + const std::shared_ptr& arr) const override; + protected: - Buffer* offset_buf_; + std::shared_ptr offset_buf_; }; class SparseUnionArray : public UnionArray {}; diff --git a/format/Layout.md b/format/Layout.md index 815c47f2c934b..d17c05460ed26 100644 --- a/format/Layout.md +++ b/format/Layout.md @@ -425,8 +425,8 @@ of overhead for each value. Its physical layout is as follows: * One child array for each relative type * Types buffer: A buffer of 8-bit signed integers, enumerated from 0 corresponding - to each type. A union with more then 127 possible types can be modeled as a - union of unions. + to each type. A union with more than 127 possible types can be modeled as a + union of unions. * Offsets buffer: A buffer of signed int32 values indicating the relative offset into the respective child array for the type in a given slot. The respective offsets for each child value array must be in order / increasing. @@ -578,7 +578,7 @@ the the types array indicates that a slot contains a different type at the index ## References -Apache Drill Documentation - [Value Vectors][6] +Apache Drill Documentation - [Value Vectors][6] [1]: https://en.wikipedia.org/wiki/Bit_numbering [2]: https://software.intel.com/en-us/articles/practical-intel-avx-optimization-on-2nd-generation-intel-core-processors diff --git a/format/Message.fbs b/format/Message.fbs index fc849eedf791a..f0c989b4c0ef2 100644 --- a/format/Message.fbs +++ b/format/Message.fbs @@ -14,6 +14,9 @@ table Tuple { table List { } +table Str { +} + enum UnionMode:int { Sparse, Dense } table Union { @@ -72,7 +75,8 @@ union Type { List, Tuple, Union, - JSONScalar + JSONScalar, + Str } /// ----------------------------------------------------------------------