Skip to content

Commit

Permalink
implementation of string, union, struct IPC
Browse files Browse the repository at this point in the history
  • Loading branch information
pcmoritz committed Jul 24, 2016
1 parent fab4c82 commit 0707062
Show file tree
Hide file tree
Showing 12 changed files with 217 additions and 14 deletions.
3 changes: 2 additions & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,8 @@ set(ARROW_SRCS
src/arrow/util/status.cc
)

set(LIBARROW_LINKAGE "SHARED")
set(LIBARROW_LINKAGE "SHARED" CACHE STRING
"Arrow can be built as a 'STATIC' or 'SHARED' library")

add_library(arrow
${LIBARROW_LINKAGE}
Expand Down
9 changes: 9 additions & 0 deletions cpp/src/arrow/builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,15 @@ class ArrayBuilder {
DISALLOW_COPY_AND_ASSIGN(ArrayBuilder);
};

class NullArrayBuilder : public ArrayBuilder {
public:
explicit NullArrayBuilder(MemoryPool* pool, const TypePtr& type) : ArrayBuilder(pool, type) {}
virtual ~NullArrayBuilder() {};
std::shared_ptr<Array> Finish() override {
return nullptr;
}
};

} // namespace arrow

#endif // ARROW_BUILDER_H_
55 changes: 51 additions & 4 deletions cpp/src/arrow/ipc/adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
#include "arrow/types/construct.h"
#include "arrow/types/list.h"
#include "arrow/types/primitive.h"
#include "arrow/types/struct.h"
#include "arrow/types/union.h"
#include "arrow/types/string.h"
#include "arrow/util/buffer.h"
#include "arrow/util/logging.h"
#include "arrow/util/status.h"
Expand Down Expand Up @@ -87,7 +90,7 @@ static bool IsListType(const DataType* type) {
// case Type::CHAR:
case Type::LIST:
// see todo on common types
// case Type::STRING:
case Type::STRING:
// case Type::VARCHAR:
return true;
default:
Expand Down Expand Up @@ -116,14 +119,24 @@ Status VisitArray(const Array* arr, std::vector<flatbuf::FieldNode>* field_nodes
if (IsPrimitive(arr_type)) {
const auto prim_arr = static_cast<const PrimitiveArray*>(arr);
buffers->push_back(prim_arr->data());
} else if (IsListType(arr_type)) {
} else if (IsListType(arr_type) || arr->type()->type == Type::STRING) {
const auto list_arr = static_cast<const ListArray*>(arr);
buffers->push_back(list_arr->offset_buffer());
RETURN_NOT_OK(VisitArray(
list_arr->values().get(), field_nodes, buffers, max_recursion_depth - 1));
} else if (arr->type_enum() == Type::STRUCT) {
// TODO(wesm)
return Status::NotImplemented("Struct type");
const auto struct_arr = static_cast<const StructArray*>(arr);
for (auto& field_arr : struct_arr->fields()) {
RETURN_NOT_OK(VisitArray(field_arr.get(), field_nodes, buffers, max_recursion_depth - 1));
}
} else if (arr->type_enum() == Type::DENSE_UNION) {
const auto union_arr = static_cast<const DenseUnionArray*>(arr);
buffers->push_back(union_arr->types());
buffers->push_back(union_arr->offset_buf());
for (auto& child_arr : union_arr->children()) {
RETURN_NOT_OK(VisitArray(child_arr.get(), field_nodes, buffers, max_recursion_depth - 1));
}
} else {
return Status::NotImplemented("Unrecognized type");
}
Expand Down Expand Up @@ -158,7 +171,7 @@ class RowBatchWriter {
// requirements of buffers but capacity always should.
size = buffer->capacity();
// check that padding is appropriate
RETURN_NOT_OK(CheckMultipleOf64(size));
RETURN_NOT_OK(CheckMultipleOf64(size)); // TODO(pcm): put this in again
}
// TODO(wesm): We currently have no notion of shared memory page id's,
// but we've included it in the metadata IDL for when we have it in the
Expand Down Expand Up @@ -316,6 +329,40 @@ class RowBatchReader::Impl {
NextArray(type->child(0).get(), max_recursion_depth - 1, &values_array));
return MakeListArray(type, field_meta.length, offsets, values_array,
field_meta.null_count, null_bitmap, out);
} else if (type->type == Type::STRUCT) {
int32_t length = 0;
const int num_children = type->num_children();
std::vector<ArrayPtr> results;
for (int i = 0; i < num_children; ++i) {
std::shared_ptr<Array> result;
RETURN_NOT_OK(NextArray(type->child(i).get(), max_recursion_depth - 1, &result));
DCHECK(length == 0 || length == result->length());
length = result->length();
results.push_back(result);
}
*out = std::make_shared<StructArray>(type, length, results);
return Status::OK();
} else if (type->type == Type::DENSE_UNION) {
std::shared_ptr<Buffer> types;
RETURN_NOT_OK(GetBuffer(buffer_index_++, &types));
std::shared_ptr<Buffer> offset_buf;
RETURN_NOT_OK(GetBuffer(buffer_index_++, &offset_buf));
auto type2 = std::dynamic_pointer_cast<DenseUnionType>(type);
if (!type2) {
return Status::Invalid("unexpected error");
}
const int num_children = type2->num_children();

std::vector<ArrayPtr> results;
for (int i = 0; i < num_children; ++i) {
std::shared_ptr<Array> result;
auto f = std::make_shared<Field>(std::string(""), type2->child(i), false);
RETURN_NOT_OK(NextArray(f.get(), max_recursion_depth - 1, &result));
results.push_back(result);
}
*out = std::make_shared<DenseUnionArray>(type, field_meta.length, results, types, offset_buf,
field_meta.null_count, null_bitmap);
return Status::OK();
}
return Status::NotImplemented("Non-primitive types not complete yet");
}
Expand Down
11 changes: 11 additions & 0 deletions cpp/src/arrow/ipc/ipc-metadata-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@

#include "gtest/gtest.h"

#include "arrow/api.h"
#include "arrow/ipc/metadata.h"
#include "arrow/schema.h"
#include "arrow/test-util.h"
#include "arrow/type.h"
#include "arrow/types/union.h"
#include "arrow/util/status.h"

namespace arrow {
Expand Down Expand Up @@ -94,4 +96,13 @@ TEST_F(TestSchemaMessage, NestedFields) {
CheckRoundtrip(&schema);
}

TEST_F(TestSchemaMessage, DenseUnion) {
auto t0 = TypePtr(new Int32Type());
auto t1 = TypePtr(new Int64Type());
auto u = TypePtr(new DenseUnionType(std::vector<TypePtr>({t0, t1})));
auto f = std::make_shared<Field>("f", u);
Schema schema({f});
CheckRoundtrip(&schema);
}

} // namespace arrow
41 changes: 39 additions & 2 deletions cpp/src/arrow/ipc/metadata-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "arrow/ipc/Message_generated.h"
#include "arrow/schema.h"
#include "arrow/type.h"
#include "arrow/types/union.h"
#include "arrow/util/buffer.h"
#include "arrow/util/status.h"

Expand Down Expand Up @@ -118,11 +119,20 @@ static Status TypeFromFlatbuffer(flatbuf::Type type, const void* type_data,
}
*out = std::make_shared<ListType>(children[0]);
return Status::OK();
case flatbuf::Type_Str:
*out = std::make_shared<StringType>();
return Status::OK();
case flatbuf::Type_Tuple:
*out = std::make_shared<StructType>(children);
return Status::OK();
case flatbuf::Type_Union:
return Status::NotImplemented("Type is not implemented");
case flatbuf::Type_Union: {
std::vector<TypePtr> child_types;
for (auto type : children) {
child_types.push_back(type->type);
}
*out = std::make_shared<DenseUnionType>(child_types); // TODO(pcm): SparseUnionType
return Status::OK();
}
default:
return Status::Invalid("Unrecognized type");
}
Expand All @@ -132,6 +142,10 @@ static Status TypeFromFlatbuffer(flatbuf::Type type, const void* type_data,
static Status FieldToFlatbuffer(
FBB& fbb, const std::shared_ptr<Field>& field, FieldOffset* offset);

// Forward declaration
static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
std::vector<FieldOffset>* children, flatbuf::Type* out_type, Offset* offset);

static Offset IntToFlatbuffer(FBB& fbb, int bitWidth, bool is_signed) {
return flatbuf::CreateInt(fbb, bitWidth, is_signed).Union();
}
Expand Down Expand Up @@ -160,6 +174,22 @@ static Status StructToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type
return Status::OK();
}

static Status UnionToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
std::vector<FieldOffset>* out_children, Offset* offset) {
auto type2 = std::dynamic_pointer_cast<DenseUnionType>(type);
if (!type2) {
return Status::Invalid("unexpected error");
}
FieldOffset field;
for (int i = 0; i < type2->num_children(); ++i) {
auto f = std::make_shared<Field>(std::string(""), type2->child(i), false);
RETURN_NOT_OK(FieldToFlatbuffer(fbb, f, &field));
out_children->push_back(field);
}
*offset = flatbuf::CreateUnion(fbb).Union();
return Status::OK();
}

#define INT_TO_FB_CASE(BIT_WIDTH, IS_SIGNED) \
*out_type = flatbuf::Type_Int; \
*offset = IntToFlatbuffer(fbb, BIT_WIDTH, IS_SIGNED); \
Expand Down Expand Up @@ -199,9 +229,16 @@ static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
case Type::LIST:
*out_type = flatbuf::Type_List;
return ListToFlatbuffer(fbb, type, children, offset);
case Type::STRING:
*out_type = flatbuf::Type_Str;
*offset = flatbuf::CreateStr(fbb).Union();
return Status::OK();
case Type::STRUCT:
*out_type = flatbuf::Type_Tuple;
return StructToFlatbuffer(fbb, type, children, offset);
case Type::DENSE_UNION:
*out_type = flatbuf::Type_Union;
return UnionToFlatbuffer(fbb, type, children, offset);
default:
*out_type = flatbuf::Type_NONE; // Make clang-tidy happy
std::stringstream ss;
Expand Down
13 changes: 13 additions & 0 deletions cpp/src/arrow/types/collection.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,19 @@ struct CollectionType : public DataType {
const TypePtr& child(int i) const { return child_types_[i]; }

int num_children() const { return child_types_.size(); }

bool Equals(const DataType* other) const override {
bool equals = other && ((this == other) || ((this->type == other->type)));
auto second = dynamic_cast<const CollectionType<T>*>(other);
equals = equals && other && this->num_children() == second->num_children();
if (equals) {
for (int i = 0; i < num_children(); ++i) {
// TODO(emkornfield) limit recursion
if (!child(i)->Equals(second->child(i))) { return false; }
}
}
return equals;
}
};

} // namespace arrow
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/arrow/types/list.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,11 @@ class ListBuilder : public ArrayBuilder {
Status Append(bool is_valid = true) {
RETURN_NOT_OK(Reserve(1));
UnsafeAppendToBitmap(is_valid);
RETURN_NOT_OK(offset_builder_.Append<int32_t>(value_builder_->length()));
if (value_builder_) {
RETURN_NOT_OK(offset_builder_.Append<int32_t>(value_builder_->length()));
} else {
RETURN_NOT_OK(offset_builder_.Append<int32_t>(0));
}
return Status::OK();
}

Expand Down
9 changes: 9 additions & 0 deletions cpp/src/arrow/types/primitive.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ class PrimitiveBuilder : public ArrayBuilder {
explicit PrimitiveBuilder(MemoryPool* pool, const TypePtr& type)
: ArrayBuilder(pool, type), data_(nullptr) {}

explicit PrimitiveBuilder(MemoryPool* pool)
: PrimitiveBuilder(pool, std::make_shared<Type>()) {}

virtual ~PrimitiveBuilder() {}

using ArrayBuilder::Advance;
Expand All @@ -128,6 +131,12 @@ class PrimitiveBuilder : public ArrayBuilder {

std::shared_ptr<Buffer> data() const { return data_; }

// Number of elements in the builder so far
int32_t num_elems() const {
// return length_ / type_traits<Type>::bytes_required(1);
return length_;
}

// Vector append
//
// If passed, valid_bytes is of equal length to values, and any zero byte
Expand Down
35 changes: 35 additions & 0 deletions cpp/src/arrow/types/union.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,41 @@

namespace arrow {

bool DenseUnionArray::Equals(const std::shared_ptr<Array>& arr) const {
if (this == arr.get()) { return true; }
if (!arr) { return false; }
if (this->type_enum() != arr->type_enum()) { return false; }
if (null_count_ != arr->null_count()) { return false; }
return RangeEquals(0, length_, 0, arr);
}

bool DenseUnionArray::RangeEquals(int32_t start_idx, int32_t end_idx, int32_t other_start_idx,
const std::shared_ptr<Array>& arr) const {
if (this == arr.get()) { return true; }
if (Type::DENSE_UNION != arr->type_enum()) { return false; }
const auto other = static_cast<DenseUnionArray*>(arr.get());

int32_t i = start_idx;
int32_t o_i = other_start_idx;
for (size_t c = 0; c < other->children().size(); ++c) {
for (int32_t e = 0; e < other->children()[c]->length(); ++e) {
if (!children()[c]->RangeEquals(e, e + 1, e, other->children()[c])) { // FIXME(pcm): fix this
return false;
}
i += 1;
o_i += 1;
if (i >= end_idx) {
return true;
}
}
}
return false; // to make the compiler happy
}

Status DenseUnionArray::Validate() const {
return Status::OK();
}

static inline std::string format_union(const std::vector<TypePtr>& child_types) {
std::stringstream s;
s << "union<";
Expand Down
37 changes: 35 additions & 2 deletions cpp/src/arrow/types/union.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "arrow/array.h"
#include "arrow/type.h"
#include "arrow/types/collection.h"
#include "arrow/types/primitive.h"

namespace arrow {

Expand All @@ -51,15 +52,47 @@ struct SparseUnionType : public CollectionType<Type::SPARSE_UNION> {
};

class UnionArray : public Array {
public:
UnionArray(const TypePtr& type, int32_t length, std::vector<ArrayPtr>& children,
std::shared_ptr<Buffer> types,
int32_t null_count = 0, std::shared_ptr<Buffer> null_bitmap = nullptr)
: Array(type, length, null_count, null_bitmap), types_(types) {
type_ = type;
children_ = children;
// CHECK_EQ(children.size() * sizeof(int16_t), types.size());
}

const std::shared_ptr<Buffer>& types() const { return types_; }

const std::vector<ArrayPtr>& children() const { return children_; }

ArrayPtr child(int32_t index) const { return children_[index]; }

protected:
// The data are types encoded as int16
Buffer* types_;
std::shared_ptr<Buffer> types_;
std::vector<std::shared_ptr<Array>> children_;
};

class DenseUnionArray : public UnionArray {
public:
DenseUnionArray(const TypePtr& type, int32_t length, std::vector<ArrayPtr>& children,
std::shared_ptr<Buffer> types, std::shared_ptr<Buffer> offset_buf,
int32_t null_count = 0, std::shared_ptr<Buffer> null_bitmap = nullptr)
: UnionArray(type, length, children, types, null_count, null_bitmap) {
offset_buf_ = offset_buf;
}

const std::shared_ptr<Buffer>& offset_buf() const { return offset_buf_; }

Status Validate() const override;

bool Equals(const std::shared_ptr<Array>& arr) const override;
bool RangeEquals(int32_t start_idx, int32_t end_idx, int32_t other_start_idx,
const std::shared_ptr<Array>& arr) const override;

protected:
Buffer* offset_buf_;
std::shared_ptr<Buffer> offset_buf_;
};

class SparseUnionArray : public UnionArray {};
Expand Down
Loading

2 comments on commit 0707062

@julienledem
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pcmoritz I think you should submit a PR. Just mention at the top if you consider this a prototype and not production ready code to manage reviewers expectations.

See:
https://issues.apache.org/jira/browse/ARROW-257

CC/ @wesm @xhochy

@wesm
Copy link

@wesm wesm commented on 0707062 Sep 17, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely, can you submit a PR? May require a rebase. cc also @emkornfield

Please sign in to comment.