diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index f803c0fb3e428..b38f91e5d687c 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -565,7 +565,7 @@ if (${CLANG_TIDY_FOUND}) `find ${CMAKE_CURRENT_SOURCE_DIR}/src -name \\*.cc | sed -e '/_generated/g'`) # runs clang-tidy and exits with a non-zero exit code if any errors are found. add_custom_target(check-clang-tidy ${BUILD_SUPPORT_DIR}/run-clang-tidy.sh ${CLANG_TIDY_BIN} ${CMAKE_BINARY_DIR}/compile_commands.json - 0 `find ${CMAKE_CURRENT_SOURCE_DIR}/src -name \\*.cc | sed -e '/_generated/g'`) + 0 `find ${CMAKE_CURRENT_SOURCE_DIR}/src -name \\*.cc |grep -v -F -f ${CMAKE_CURRENT_SOURCE_DIR}/src/.clang-tidy-ignore | sed -e '/_generated/g'`) endif() diff --git a/cpp/README.md b/cpp/README.md index 3f5da21b7d417..c8cd86fedc6fe 100644 --- a/cpp/README.md +++ b/cpp/README.md @@ -76,4 +76,11 @@ build failures by running the following checks before submitting your pull reque Note that the clang-tidy target may take a while to run. You might consider running clang-tidy separately on the files you have added/changed before -invoking the make target to reduce iteration time. +invoking the make target to reduce iteration time. Also, it might generate warnings +that aren't valid. To avoid these you can use add a line comment `// NOLINT`. If +NOLINT doesn't suppress the warnings, you add the file in question to +the .clang-tidy-ignore file. This will allow `make check-clang-tidy` to pass in +travis-CI (but still surface the potential warnings in `make clang-tidy`). Ideally, +both of these options would be used rarely. Current known uses-cases whent hey are required: + +* Parameterized tests in google test. diff --git a/cpp/src/.clang-tidy-ignore b/cpp/src/.clang-tidy-ignore new file mode 100644 index 0000000000000..a128c38889672 --- /dev/null +++ b/cpp/src/.clang-tidy-ignore @@ -0,0 +1 @@ +ipc-adapter-test.cc diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc index a1536861a20be..c6b9b1599cdd2 100644 --- a/cpp/src/arrow/array.cc +++ b/cpp/src/arrow/array.cc @@ -20,6 +20,7 @@ #include #include "arrow/util/buffer.h" +#include "arrow/util/status.h" namespace arrow { @@ -47,6 +48,10 @@ bool Array::EqualsExact(const Array& other) const { return true; } +Status Array::Validate() const { + return Status::OK(); +} + bool NullArray::Equals(const std::shared_ptr& arr) const { if (this == arr.get()) { return true; } if (Type::NA != arr->type_enum()) { return false; } diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h index c6735f87d8f42..f98c4c28310f8 100644 --- a/cpp/src/arrow/array.h +++ b/cpp/src/arrow/array.h @@ -28,6 +28,7 @@ namespace arrow { class Buffer; +class Status; // Immutable data array with some logical type and some length. Any memory is // owned by the respective Buffer instance (or its parents). @@ -39,7 +40,7 @@ class Array { Array(const std::shared_ptr& type, int32_t length, int32_t null_count = 0, const std::shared_ptr& null_bitmap = nullptr); - virtual ~Array() {} + virtual ~Array() = default; // Determine if a slot is null. For inner loops. Does *not* boundscheck bool IsNull(int i) const { @@ -58,6 +59,9 @@ class Array { bool EqualsExact(const Array& arr) const; virtual bool Equals(const std::shared_ptr& arr) const = 0; + // Determines if the array is internally consistent. Defaults to always + // returning Status::OK. This can be an expensive check. + virtual Status Validate() const; protected: std::shared_ptr type_; diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc index 1447078f76028..87c1219025d37 100644 --- a/cpp/src/arrow/builder.cc +++ b/cpp/src/arrow/builder.cc @@ -25,6 +25,25 @@ namespace arrow { +Status ArrayBuilder::AppendToBitmap(bool is_valid) { + if (length_ == capacity_) { + // If the capacity was not already a multiple of 2, do so here + // TODO(emkornfield) doubling isn't great default allocation practice + // see https://github.com/facebook/folly/blob/master/folly/docs/FBVector.md + // fo discussion + RETURN_NOT_OK(Resize(util::next_power2(capacity_ + 1))); + } + UnsafeAppendToBitmap(is_valid); + return Status::OK(); +} + +Status ArrayBuilder::AppendToBitmap(const uint8_t* valid_bytes, int32_t length) { + RETURN_NOT_OK(Reserve(length)); + + UnsafeAppendToBitmap(valid_bytes, length); + return Status::OK(); +} + Status ArrayBuilder::Init(int32_t capacity) { capacity_ = capacity; int32_t to_alloc = util::ceil_byte(capacity) / 8; @@ -36,6 +55,7 @@ Status ArrayBuilder::Init(int32_t capacity) { } Status ArrayBuilder::Resize(int32_t new_bits) { + if (!null_bitmap_) { return Init(new_bits); } int32_t new_bytes = util::ceil_byte(new_bits) / 8; int32_t old_bytes = null_bitmap_->size(); RETURN_NOT_OK(null_bitmap_->Resize(new_bytes)); @@ -56,10 +76,46 @@ Status ArrayBuilder::Advance(int32_t elements) { Status ArrayBuilder::Reserve(int32_t elements) { if (length_ + elements > capacity_) { + // TODO(emkornfield) power of 2 growth is potentially suboptimal int32_t new_capacity = util::next_power2(length_ + elements); return Resize(new_capacity); } return Status::OK(); } +Status ArrayBuilder::SetNotNull(int32_t length) { + RETURN_NOT_OK(Reserve(length)); + UnsafeSetNotNull(length); + return Status::OK(); +} + +void ArrayBuilder::UnsafeAppendToBitmap(bool is_valid) { + if (is_valid) { + util::set_bit(null_bitmap_data_, length_); + } else { + ++null_count_; + } + ++length_; +} + +void ArrayBuilder::UnsafeAppendToBitmap(const uint8_t* valid_bytes, int32_t length) { + if (valid_bytes == nullptr) { + UnsafeSetNotNull(length); + return; + } + for (int32_t i = 0; i < length; ++i) { + // TODO(emkornfield) Optimize for large values of length? + UnsafeAppendToBitmap(valid_bytes[i] > 0); + } +} + +void ArrayBuilder::UnsafeSetNotNull(int32_t length) { + const int32_t new_length = length + length_; + // TODO(emkornfield) Optimize for large values of length? + for (int32_t i = length_; i < new_length; ++i) { + util::set_bit(null_bitmap_data_, i); + } + length_ = new_length; +} + } // namespace arrow diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h index 21a6341ef5086..7d3f4398d73e3 100644 --- a/cpp/src/arrow/builder.h +++ b/cpp/src/arrow/builder.h @@ -34,7 +34,10 @@ class PoolBuffer; static constexpr int32_t MIN_BUILDER_CAPACITY = 1 << 5; -// Base class for all data array builders +// Base class for all data array builders. +// This class provides a facilities for incrementally building the null bitmap +// (see Append methods) and as a side effect the current number of slots and +// the null count. class ArrayBuilder { public: explicit ArrayBuilder(MemoryPool* pool, const TypePtr& type) @@ -46,7 +49,7 @@ class ArrayBuilder { length_(0), capacity_(0) {} - virtual ~ArrayBuilder() {} + virtual ~ArrayBuilder() = default; // For nested types. Since the objects are owned by this class instance, we // skip shared pointers and just return a raw pointer @@ -58,14 +61,27 @@ class ArrayBuilder { int32_t null_count() const { return null_count_; } int32_t capacity() const { return capacity_; } - // Allocates requires memory at this level, but children need to be - // initialized independently - Status Init(int32_t capacity); + // Append to null bitmap + Status AppendToBitmap(bool is_valid); + // Vector append. Treat each zero byte as a null. If valid_bytes is null + // assume all of length bits are valid. + Status AppendToBitmap(const uint8_t* valid_bytes, int32_t length); + // Set the next length bits to not null (i.e. valid). + Status SetNotNull(int32_t length); - // Resizes the null_bitmap array - Status Resize(int32_t new_bits); + // Allocates initial capacity requirements for the builder. In most + // cases subclasses should override and call there parent classes + // method as well. + virtual Status Init(int32_t capacity); - Status Reserve(int32_t extra_bits); + // Resizes the null_bitmap array. In most + // cases subclasses should override and call there parent classes + // method as well. + virtual Status Resize(int32_t new_bits); + + // Ensures there is enough space for adding the number of elements by checking + // capacity and calling Resize if necessary. + Status Reserve(int32_t elements); // For cases where raw data was memcpy'd into the internal buffers, allows us // to advance the length of the builder. It is your responsibility to use @@ -75,7 +91,7 @@ class ArrayBuilder { const std::shared_ptr& null_bitmap() const { return null_bitmap_; } // Creates new array object to hold the contents of the builder and transfers - // ownership of the data + // ownership of the data. This resets all variables on the builder. virtual std::shared_ptr Finish() = 0; const std::shared_ptr& type() const { return type_; } @@ -97,6 +113,18 @@ class ArrayBuilder { // Child value array builders. These are owned by this class std::vector> children_; + // + // Unsafe operations (don't check capacity/don't resize) + // + + // Append to null bitmap. + void UnsafeAppendToBitmap(bool is_valid); + // Vector append. Treat each zero byte as a nullzero. If valid_bytes is null + // assume all of length bits are valid. + void UnsafeAppendToBitmap(const uint8_t* valid_bytes, int32_t length); + // Set the next length bits to not null (i.e. valid). + void UnsafeSetNotNull(int32_t length); + private: DISALLOW_COPY_AND_ASSIGN(ArrayBuilder); }; diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc index 2f72c3aa8467a..bf6fa94dea7a4 100644 --- a/cpp/src/arrow/ipc/adapter.cc +++ b/cpp/src/arrow/ipc/adapter.cc @@ -19,17 +19,19 @@ #include #include +#include #include #include "arrow/array.h" -#include "arrow/ipc/memory.h" #include "arrow/ipc/Message_generated.h" -#include "arrow/ipc/metadata.h" +#include "arrow/ipc/memory.h" #include "arrow/ipc/metadata-internal.h" +#include "arrow/ipc/metadata.h" #include "arrow/schema.h" #include "arrow/table.h" #include "arrow/type.h" #include "arrow/types/construct.h" +#include "arrow/types/list.h" #include "arrow/types/primitive.h" #include "arrow/util/buffer.h" #include "arrow/util/logging.h" @@ -63,44 +65,70 @@ static bool IsPrimitive(const DataType* type) { } } +static bool IsListType(const DataType* type) { + DCHECK(type != nullptr); + switch (type->type) { + // TODO(emkornfield) grouping like this are used in a few places in the + // code consider using pattern like: + // http://stackoverflow.com/questions/26784685/c-macro-for-calling-function-based-on-enum-type + // + // TODO(emkornfield) Fix type systems so these are all considered lists and + // the types behave the same way? + // case Type::BINARY: + // case Type::CHAR: + case Type::LIST: + // see todo on common types + // case Type::STRING: + // case Type::VARCHAR: + return true; + default: + return false; + } +} + // ---------------------------------------------------------------------- // Row batch write path Status VisitArray(const Array* arr, std::vector* field_nodes, - std::vector>* buffers) { - if (IsPrimitive(arr->type().get())) { - const PrimitiveArray* prim_arr = static_cast(arr); - - field_nodes->push_back( - flatbuf::FieldNode(prim_arr->length(), prim_arr->null_count())); + std::vector>* buffers, int max_recursion_depth) { + if (max_recursion_depth <= 0) { return Status::Invalid("Max recursion depth reached"); } + DCHECK(arr); + DCHECK(field_nodes); + // push back all common elements + field_nodes->push_back(flatbuf::FieldNode(arr->length(), arr->null_count())); + if (arr->null_count() > 0) { + buffers->push_back(arr->null_bitmap()); + } else { + // Push a dummy zero-length buffer, not to be copied + buffers->push_back(std::make_shared(nullptr, 0)); + } - if (prim_arr->null_count() > 0) { - buffers->push_back(prim_arr->null_bitmap()); - } else { - // Push a dummy zero-length buffer, not to be copied - buffers->push_back(std::make_shared(nullptr, 0)); - } + const DataType* arr_type = arr->type().get(); + if (IsPrimitive(arr_type)) { + const auto prim_arr = static_cast(arr); buffers->push_back(prim_arr->data()); - } else if (arr->type_enum() == Type::LIST) { - // TODO(wesm) - return Status::NotImplemented("List type"); + } else if (IsListType(arr_type)) { + const auto list_arr = static_cast(arr); + buffers->push_back(list_arr->offset_buffer()); + RETURN_NOT_OK(VisitArray( + list_arr->values().get(), field_nodes, buffers, max_recursion_depth - 1)); } else if (arr->type_enum() == Type::STRUCT) { // TODO(wesm) return Status::NotImplemented("Struct type"); } - return Status::OK(); } class RowBatchWriter { public: - explicit RowBatchWriter(const RowBatch* batch) : batch_(batch) {} + RowBatchWriter(const RowBatch* batch, int max_recursion_depth) + : batch_(batch), max_recursion_depth_(max_recursion_depth) {} Status AssemblePayload() { // Perform depth-first traversal of the row-batch for (int i = 0; i < batch_->num_columns(); ++i) { const Array* arr = batch_->column(i).get(); - RETURN_NOT_OK(VisitArray(arr, &field_nodes_, &buffers_)); + RETURN_NOT_OK(VisitArray(arr, &field_nodes_, &buffers_, max_recursion_depth_)); } return Status::OK(); } @@ -111,8 +139,10 @@ class RowBatchWriter { int64_t offset = 0; for (size_t i = 0; i < buffers_.size(); ++i) { const Buffer* buffer = buffers_[i].get(); - int64_t size = buffer->size(); + int64_t size = 0; + // The buffer might be null if we are handling zero row lengths. + if (buffer) { size = buffer->size(); } // TODO(wesm): We currently have no notion of shared memory page id's, // but we've included it in the metadata IDL for when we have it in the // future. Use page=0 for now @@ -171,11 +201,13 @@ class RowBatchWriter { std::vector field_nodes_; std::vector buffer_meta_; std::vector> buffers_; + int max_recursion_depth_; }; -Status WriteRowBatch( - MemorySource* dst, const RowBatch* batch, int64_t position, int64_t* header_offset) { - RowBatchWriter serializer(batch); +Status WriteRowBatch(MemorySource* dst, const RowBatch* batch, int64_t position, + int64_t* header_offset, int max_recursion_depth) { + DCHECK_GT(max_recursion_depth, 0); + RowBatchWriter serializer(batch, max_recursion_depth); RETURN_NOT_OK(serializer.AssemblePayload()); return serializer.Write(dst, position, header_offset); } @@ -186,8 +218,9 @@ static constexpr int64_t INIT_METADATA_SIZE = 4096; class RowBatchReader::Impl { public: - Impl(MemorySource* source, const std::shared_ptr& metadata) - : source_(source), metadata_(metadata) { + Impl(MemorySource* source, const std::shared_ptr& metadata, + int max_recursion_depth) + : source_(source), metadata_(metadata), max_recursion_depth_(max_recursion_depth) { num_buffers_ = metadata->num_buffers(); num_flattened_fields_ = metadata->num_fields(); } @@ -203,7 +236,7 @@ class RowBatchReader::Impl { buffer_index_ = 0; for (int i = 0; i < schema->num_fields(); ++i) { const Field* field = schema->field(i).get(); - RETURN_NOT_OK(NextArray(field, &arrays[i])); + RETURN_NOT_OK(NextArray(field, max_recursion_depth_, &arrays[i])); } *out = std::make_shared(schema, metadata_->length(), arrays); @@ -213,8 +246,12 @@ class RowBatchReader::Impl { private: // Traverse the flattened record batch metadata and reassemble the // corresponding array containers - Status NextArray(const Field* field, std::shared_ptr* out) { - const std::shared_ptr& type = field->type; + Status NextArray( + const Field* field, int max_recursion_depth, std::shared_ptr* out) { + const TypePtr& type = field->type; + if (max_recursion_depth <= 0) { + return Status::Invalid("Max recursion depth reached"); + } // pop off a field if (field_index_ >= num_flattened_fields_) { @@ -226,23 +263,42 @@ class RowBatchReader::Impl { // we can skip that buffer without reading from shared memory FieldMetadata field_meta = metadata_->field(field_index_++); + // extract null_bitmap which is common to all arrays + std::shared_ptr null_bitmap; + if (field_meta.null_count == 0) { + ++buffer_index_; + } else { + RETURN_NOT_OK(GetBuffer(buffer_index_++, &null_bitmap)); + } + if (IsPrimitive(type.get())) { - std::shared_ptr null_bitmap; std::shared_ptr data; - if (field_meta.null_count == 0) { - null_bitmap = nullptr; - ++buffer_index_; - } else { - RETURN_NOT_OK(GetBuffer(buffer_index_++, &null_bitmap)); - } if (field_meta.length > 0) { RETURN_NOT_OK(GetBuffer(buffer_index_++, &data)); } else { + buffer_index_++; data.reset(new Buffer(nullptr, 0)); } return MakePrimitiveArray( type, field_meta.length, data, field_meta.null_count, null_bitmap, out); } + + if (IsListType(type.get())) { + std::shared_ptr offsets; + RETURN_NOT_OK(GetBuffer(buffer_index_++, &offsets)); + const int num_children = type->num_children(); + if (num_children != 1) { + std::stringstream ss; + ss << "Field: " << field->ToString() + << " has wrong number of children:" << num_children; + return Status::Invalid(ss.str()); + } + std::shared_ptr values_array; + RETURN_NOT_OK( + NextArray(type->child(0).get(), max_recursion_depth - 1, &values_array)); + return MakeListArray(type, field_meta.length, offsets, values_array, + field_meta.null_count, null_bitmap, out); + } return Status::NotImplemented("Non-primitive types not complete yet"); } @@ -256,12 +312,18 @@ class RowBatchReader::Impl { int field_index_; int buffer_index_; + int max_recursion_depth_; int num_buffers_; int num_flattened_fields_; }; Status RowBatchReader::Open( MemorySource* source, int64_t position, std::shared_ptr* out) { + return Open(source, position, kMaxIpcRecursionDepth, out); +} + +Status RowBatchReader::Open(MemorySource* source, int64_t position, + int max_recursion_depth, std::shared_ptr* out) { std::shared_ptr metadata; RETURN_NOT_OK(source->ReadAt(position, INIT_METADATA_SIZE, &metadata)); @@ -286,7 +348,7 @@ Status RowBatchReader::Open( std::shared_ptr batch_meta = message->GetRecordBatch(); std::shared_ptr result(new RowBatchReader()); - result->impl_.reset(new Impl(source, batch_meta)); + result->impl_.reset(new Impl(source, batch_meta, max_recursion_depth)); *out = result; return Status::OK(); diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h index d453fa05f4982..4c9a8a9d8ee39 100644 --- a/cpp/src/arrow/ipc/adapter.h +++ b/cpp/src/arrow/ipc/adapter.h @@ -38,7 +38,9 @@ class RecordBatchMessage; // ---------------------------------------------------------------------- // Write path - +// We have trouble decoding flatbuffers if the size i > 70, so 64 is a nice round number +// TODO(emkornfield) investigate this more +constexpr int kMaxIpcRecursionDepth = 64; // Write the RowBatch (collection of equal-length Arrow arrays) to the memory // source at the indicated position // @@ -52,8 +54,8 @@ class RecordBatchMessage; // // Finally, the memory offset to the start of the metadata / data header is // returned in an out-variable -Status WriteRowBatch( - MemorySource* dst, const RowBatch* batch, int64_t position, int64_t* header_offset); +Status WriteRowBatch(MemorySource* dst, const RowBatch* batch, int64_t position, + int64_t* header_offset, int max_recursion_depth = kMaxIpcRecursionDepth); // int64_t GetRowBatchMetadata(const RowBatch* batch); @@ -70,6 +72,9 @@ class RowBatchReader { static Status Open( MemorySource* source, int64_t position, std::shared_ptr* out); + static Status Open(MemorySource* source, int64_t position, int max_recursion_depth, + std::shared_ptr* out); + // Reassemble the row batch. A Schema is required to be able to construct the // right array containers Status GetRowBatch( diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc b/cpp/src/arrow/ipc/ipc-adapter-test.cc index fbdda77e4919c..c243cfba820cc 100644 --- a/cpp/src/arrow/ipc/ipc-adapter-test.cc +++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc @@ -18,9 +18,7 @@ #include #include #include -#include #include -#include #include #include @@ -31,6 +29,7 @@ #include "arrow/ipc/test-common.h" #include "arrow/test-util.h" +#include "arrow/types/list.h" #include "arrow/types/primitive.h" #include "arrow/util/bit-util.h" #include "arrow/util/buffer.h" @@ -40,25 +39,56 @@ namespace arrow { namespace ipc { -class TestWriteRowBatch : public ::testing::Test, public MemoryMapFixture { +// TODO(emkornfield) convert to google style kInt32, etc? +const auto INT32 = std::make_shared(); +const auto LIST_INT32 = std::make_shared(INT32); +const auto LIST_LIST_INT32 = std::make_shared(LIST_INT32); + +typedef Status MakeRowBatch(std::shared_ptr* out); + +class TestWriteRowBatch : public ::testing::TestWithParam, + public MemoryMapFixture { public: void SetUp() { pool_ = default_memory_pool(); } void TearDown() { MemoryMapFixture::TearDown(); } - void InitMemoryMap(int64_t size) { + Status RoundTripHelper(const RowBatch& batch, int memory_map_size, + std::shared_ptr* batch_result) { std::string path = "test-write-row-batch"; - MemoryMapFixture::CreateFile(path, size); - ASSERT_OK(MemoryMappedSource::Open(path, MemorySource::READ_WRITE, &mmap_)); + MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_); + int64_t header_location; + RETURN_NOT_OK(WriteRowBatch(mmap_.get(), &batch, 0, &header_location)); + + std::shared_ptr reader; + RETURN_NOT_OK(RowBatchReader::Open(mmap_.get(), header_location, &reader)); + + RETURN_NOT_OK(reader->GetRowBatch(batch.schema(), batch_result)); + return Status::OK(); } protected: - MemoryPool* pool_; std::shared_ptr mmap_; + MemoryPool* pool_; }; -const auto INT32 = std::make_shared(); +TEST_P(TestWriteRowBatch, RoundTrip) { + std::shared_ptr batch; + ASSERT_OK((*GetParam())(&batch)); // NOLINT clang-tidy gtest issue + std::shared_ptr batch_result; + ASSERT_OK(RoundTripHelper(*batch, 1 << 16, &batch_result)); + + // do checks + ASSERT_TRUE(batch->schema()->Equals(batch_result->schema())); + ASSERT_EQ(batch->num_columns(), batch_result->num_columns()) + << batch->schema()->ToString() << " result: " << batch_result->schema()->ToString(); + EXPECT_EQ(batch->num_rows(), batch_result->num_rows()); + for (int i = 0; i < batch->num_columns(); ++i) { + EXPECT_TRUE(batch->column(i)->Equals(batch_result->column(i))) + << "Idx: " << i << " Name: " << batch->column_name(i); + } +} -TEST_F(TestWriteRowBatch, IntegerRoundTrip) { +Status MakeIntRowBatch(std::shared_ptr* out) { const int length = 1000; // Make the schema @@ -67,41 +97,159 @@ TEST_F(TestWriteRowBatch, IntegerRoundTrip) { std::shared_ptr schema(new Schema({f0, f1})); // Example data + std::shared_ptr a0, a1; + MemoryPool* pool = default_memory_pool(); + RETURN_NOT_OK(MakeRandomInt32Array(length, false, pool, &a0)); + RETURN_NOT_OK(MakeRandomInt32Array(length, true, pool, &a1)); + out->reset(new RowBatch(schema, length, {a0, a1})); + return Status::OK(); +} - auto data = std::make_shared(pool_); - ASSERT_OK(data->Resize(length * sizeof(int32_t))); - test::rand_uniform_int(length, 0, 0, std::numeric_limits::max(), - reinterpret_cast(data->mutable_data())); +Status MakeListRowBatch(std::shared_ptr* out) { + // Make the schema + auto f0 = std::make_shared("f0", LIST_INT32); + auto f1 = std::make_shared("f1", LIST_LIST_INT32); + auto f2 = std::make_shared("f2", INT32); + std::shared_ptr schema(new Schema({f0, f1, f2})); - auto null_bitmap = std::make_shared(pool_); - int null_bytes = util::bytes_for_bits(length); - ASSERT_OK(null_bitmap->Resize(null_bytes)); - test::random_bytes(null_bytes, 0, null_bitmap->mutable_data()); + // Example data - auto a0 = std::make_shared(length, data); - auto a1 = std::make_shared( - length, data, test::bitmap_popcount(null_bitmap->data(), length), null_bitmap); + MemoryPool* pool = default_memory_pool(); + const int length = 200; + std::shared_ptr leaf_values, list_array, list_list_array, flat_array; + const bool include_nulls = true; + RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool, &leaf_values)); + RETURN_NOT_OK( + MakeRandomListArray(leaf_values, length, include_nulls, pool, &list_array)); + RETURN_NOT_OK( + MakeRandomListArray(list_array, length, include_nulls, pool, &list_list_array)); + RETURN_NOT_OK(MakeRandomInt32Array(length, include_nulls, pool, &flat_array)); + out->reset(new RowBatch(schema, length, {list_array, list_list_array, flat_array})); + return Status::OK(); +} - RowBatch batch(schema, length, {a0, a1}); +Status MakeZeroLengthRowBatch(std::shared_ptr* out) { + // Make the schema + auto f0 = std::make_shared("f0", LIST_INT32); + auto f1 = std::make_shared("f1", LIST_LIST_INT32); + auto f2 = std::make_shared("f2", INT32); + std::shared_ptr schema(new Schema({f0, f1, f2})); - // TODO(wesm): computing memory requirements for a row batch - // 64k is plenty of space - InitMemoryMap(1 << 16); + // Example data + MemoryPool* pool = default_memory_pool(); + const int length = 200; + const bool include_nulls = true; + std::shared_ptr leaf_values, list_array, list_list_array, flat_array; + RETURN_NOT_OK(MakeRandomInt32Array(0, include_nulls, pool, &leaf_values)); + RETURN_NOT_OK(MakeRandomListArray(leaf_values, 0, include_nulls, pool, &list_array)); + RETURN_NOT_OK( + MakeRandomListArray(list_array, 0, include_nulls, pool, &list_list_array)); + RETURN_NOT_OK(MakeRandomInt32Array(0, include_nulls, pool, &flat_array)); + out->reset(new RowBatch(schema, length, {list_array, list_list_array, flat_array})); + return Status::OK(); +} - int64_t header_location; - ASSERT_OK(WriteRowBatch(mmap_.get(), &batch, 0, &header_location)); +Status MakeNonNullRowBatch(std::shared_ptr* out) { + // Make the schema + auto f0 = std::make_shared("f0", LIST_INT32); + auto f1 = std::make_shared("f1", LIST_LIST_INT32); + auto f2 = std::make_shared("f2", INT32); + std::shared_ptr schema(new Schema({f0, f1, f2})); - std::shared_ptr result; - ASSERT_OK(RowBatchReader::Open(mmap_.get(), header_location, &result)); + // Example data + MemoryPool* pool = default_memory_pool(); + const int length = 200; + std::shared_ptr leaf_values, list_array, list_list_array, flat_array; - std::shared_ptr batch_result; - ASSERT_OK(result->GetRowBatch(schema, &batch_result)); - EXPECT_EQ(batch.num_rows(), batch_result->num_rows()); + RETURN_NOT_OK(MakeRandomInt32Array(1000, true, pool, &leaf_values)); + bool include_nulls = false; + RETURN_NOT_OK(MakeRandomListArray(leaf_values, 50, include_nulls, pool, &list_array)); + RETURN_NOT_OK( + MakeRandomListArray(list_array, 50, include_nulls, pool, &list_list_array)); + RETURN_NOT_OK(MakeRandomInt32Array(0, include_nulls, pool, &flat_array)); + out->reset(new RowBatch(schema, length, {list_array, list_list_array, flat_array})); + return Status::OK(); +} - for (int i = 0; i < batch.num_columns(); ++i) { - EXPECT_TRUE(batch.column(i)->Equals(batch_result->column(i))) << i - << batch.column_name(i); +Status MakeDeeplyNestedList(std::shared_ptr* out) { + const int batch_length = 5; + TypePtr type = INT32; + + MemoryPool* pool = default_memory_pool(); + ArrayPtr array; + const bool include_nulls = true; + RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool, &array)); + for (int i = 0; i < 63; ++i) { + type = std::static_pointer_cast(std::make_shared(type)); + RETURN_NOT_OK(MakeRandomListArray(array, batch_length, include_nulls, pool, &array)); + } + + auto f0 = std::make_shared("f0", type); + std::shared_ptr schema(new Schema({f0})); + std::vector arrays = {array}; + out->reset(new RowBatch(schema, batch_length, arrays)); + return Status::OK(); +} + +INSTANTIATE_TEST_CASE_P(RoundTripTests, TestWriteRowBatch, + ::testing::Values(&MakeIntRowBatch, &MakeListRowBatch, &MakeNonNullRowBatch, + &MakeZeroLengthRowBatch, &MakeDeeplyNestedList)); + +class RecursionLimits : public ::testing::Test, public MemoryMapFixture { + public: + void SetUp() { pool_ = default_memory_pool(); } + void TearDown() { MemoryMapFixture::TearDown(); } + + Status WriteToMmap(int recursion_level, bool override_level, + int64_t* header_out = nullptr, std::shared_ptr* schema_out = nullptr) { + const int batch_length = 5; + TypePtr type = INT32; + ArrayPtr array; + const bool include_nulls = true; + RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool_, &array)); + for (int i = 0; i < recursion_level; ++i) { + type = std::static_pointer_cast(std::make_shared(type)); + RETURN_NOT_OK( + MakeRandomListArray(array, batch_length, include_nulls, pool_, &array)); + } + + auto f0 = std::make_shared("f0", type); + std::shared_ptr schema(new Schema({f0})); + if (schema_out != nullptr) { *schema_out = schema; } + std::vector arrays = {array}; + auto batch = std::make_shared(schema, batch_length, arrays); + + std::string path = "test-write-past-max-recursion"; + const int memory_map_size = 1 << 16; + MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_); + int64_t header_location; + int64_t* header_out_param = header_out == nullptr ? &header_location : header_out; + if (override_level) { + return WriteRowBatch( + mmap_.get(), batch.get(), 0, header_out_param, recursion_level + 1); + } else { + return WriteRowBatch(mmap_.get(), batch.get(), 0, header_out_param); + } } + + protected: + std::shared_ptr mmap_; + MemoryPool* pool_; +}; + +TEST_F(RecursionLimits, WriteLimit) { + ASSERT_RAISES(Invalid, WriteToMmap((1 << 8) + 1, false)); +} + +TEST_F(RecursionLimits, ReadLimit) { + int64_t header_location; + std::shared_ptr schema; + ASSERT_OK(WriteToMmap(64, true, &header_location, &schema)); + + std::shared_ptr reader; + ASSERT_OK(RowBatchReader::Open(mmap_.get(), header_location, &reader)); + std::shared_ptr batch_result; + ASSERT_RAISES(Invalid, reader->GetRowBatch(schema, &batch_result)); } } // namespace ipc diff --git a/cpp/src/arrow/ipc/memory.cc b/cpp/src/arrow/ipc/memory.cc index 2b077e9792925..84cbc182cd26f 100644 --- a/cpp/src/arrow/ipc/memory.cc +++ b/cpp/src/arrow/ipc/memory.cc @@ -18,6 +18,7 @@ #include "arrow/ipc/memory.h" #include // For memory-mapping + #include #include #include diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc index ad5951d17e2c0..1b1d50f96eaf5 100644 --- a/cpp/src/arrow/ipc/metadata-internal.cc +++ b/cpp/src/arrow/ipc/metadata-internal.cc @@ -17,13 +17,14 @@ #include "arrow/ipc/metadata-internal.h" -#include #include #include #include #include #include +#include "flatbuffers/flatbuffers.h" + #include "arrow/ipc/Message_generated.h" #include "arrow/schema.h" #include "arrow/type.h" diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h index 779c5a30a044a..871b5bc4bf606 100644 --- a/cpp/src/arrow/ipc/metadata-internal.h +++ b/cpp/src/arrow/ipc/metadata-internal.h @@ -18,11 +18,12 @@ #ifndef ARROW_IPC_METADATA_INTERNAL_H #define ARROW_IPC_METADATA_INTERNAL_H -#include #include #include #include +#include "flatbuffers/flatbuffers.h" + #include "arrow/ipc/Message_generated.h" namespace arrow { diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc index bcf104f0b8ba6..4fc8ec50eb716 100644 --- a/cpp/src/arrow/ipc/metadata.cc +++ b/cpp/src/arrow/ipc/metadata.cc @@ -17,11 +17,12 @@ #include "arrow/ipc/metadata.h" -#include #include #include #include +#include "flatbuffers/flatbuffers.h" + // Generated C++ flatbuffer IDL #include "arrow/ipc/Message_generated.h" #include "arrow/ipc/metadata-internal.h" diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h index 65c837dc8b141..e7dbb84d790a1 100644 --- a/cpp/src/arrow/ipc/test-common.h +++ b/cpp/src/arrow/ipc/test-common.h @@ -18,11 +18,19 @@ #ifndef ARROW_IPC_TEST_COMMON_H #define ARROW_IPC_TEST_COMMON_H +#include #include #include #include #include +#include "arrow/array.h" +#include "arrow/test-util.h" +#include "arrow/types/list.h" +#include "arrow/types/primitive.h" +#include "arrow/util/buffer.h" +#include "arrow/util/memory-pool.h" + namespace arrow { namespace ipc { @@ -41,10 +49,69 @@ class MemoryMapFixture { fclose(file); } + Status InitMemoryMap( + int64_t size, const std::string& path, std::shared_ptr* mmap) { + CreateFile(path, size); + return MemoryMappedSource::Open(path, MemorySource::READ_WRITE, mmap); + } + private: std::vector tmp_files_; }; +Status MakeRandomInt32Array( + int32_t length, bool include_nulls, MemoryPool* pool, std::shared_ptr* array) { + std::shared_ptr data; + test::MakeRandomInt32PoolBuffer(length, pool, &data); + const auto INT32 = std::make_shared(); + Int32Builder builder(pool, INT32); + if (include_nulls) { + std::shared_ptr valid_bytes; + test::MakeRandomBytePoolBuffer(length, pool, &valid_bytes); + RETURN_NOT_OK(builder.Append( + reinterpret_cast(data->data()), length, valid_bytes->data())); + *array = builder.Finish(); + return Status::OK(); + } + RETURN_NOT_OK(builder.Append(reinterpret_cast(data->data()), length)); + *array = builder.Finish(); + return Status::OK(); +} + +Status MakeRandomListArray(const std::shared_ptr& child_array, int num_lists, + bool include_nulls, MemoryPool* pool, std::shared_ptr* array) { + // Create the null list values + std::vector valid_lists(num_lists); + const double null_percent = include_nulls ? 0.1 : 0; + test::random_null_bytes(num_lists, null_percent, valid_lists.data()); + + // Create list offsets + const int max_list_size = 10; + + std::vector list_sizes(num_lists, 0); + std::vector offsets( + num_lists + 1, 0); // +1 so we can shift for nulls. See partial sum below. + const int seed = child_array->length(); + if (num_lists > 0) { + test::rand_uniform_int(num_lists, seed, 0, max_list_size, list_sizes.data()); + // make sure sizes are consistent with null + std::transform(list_sizes.begin(), list_sizes.end(), valid_lists.begin(), + list_sizes.begin(), + [](int32_t size, int32_t valid) { return valid == 0 ? 0 : size; }); + std::partial_sum(list_sizes.begin(), list_sizes.end(), ++offsets.begin()); + + // Force invariants + const int child_length = child_array->length(); + offsets[0] = 0; + std::replace_if(offsets.begin(), offsets.end(), + [child_length](int32_t offset) { return offset > child_length; }, child_length); + } + ListBuilder builder(pool, child_array); + RETURN_NOT_OK(builder.Append(offsets.data(), num_lists, valid_lists.data())); + *array = builder.Finish(); + return (*array)->Validate(); +} + } // namespace ipc } // namespace arrow diff --git a/cpp/src/arrow/parquet/schema.cc b/cpp/src/arrow/parquet/schema.cc index 066388b4d0e23..560e28374066b 100644 --- a/cpp/src/arrow/parquet/schema.cc +++ b/cpp/src/arrow/parquet/schema.cc @@ -21,8 +21,8 @@ #include "parquet/api/schema.h" -#include "arrow/util/status.h" #include "arrow/types/decimal.h" +#include "arrow/util/status.h" using parquet::schema::Node; using parquet::schema::NodePtr; diff --git a/cpp/src/arrow/schema.cc b/cpp/src/arrow/schema.cc index a38acaa94ba56..ff3ea1990e551 100644 --- a/cpp/src/arrow/schema.cc +++ b/cpp/src/arrow/schema.cc @@ -18,8 +18,8 @@ #include "arrow/schema.h" #include -#include #include +#include #include #include "arrow/type.h" diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h index 538d9b233d990..2f81161d1d6d1 100644 --- a/cpp/src/arrow/test-util.h +++ b/cpp/src/arrow/test-util.h @@ -19,6 +19,7 @@ #define ARROW_TEST_UTIL_H_ #include +#include #include #include #include @@ -26,12 +27,13 @@ #include "gtest/gtest.h" -#include "arrow/type.h" #include "arrow/column.h" #include "arrow/schema.h" #include "arrow/table.h" +#include "arrow/type.h" #include "arrow/util/bit-util.h" #include "arrow/util/buffer.h" +#include "arrow/util/logging.h" #include "arrow/util/memory-pool.h" #include "arrow/util/random.h" #include "arrow/util/status.h" @@ -103,10 +105,12 @@ std::shared_ptr to_buffer(const std::vector& values) { reinterpret_cast(values.data()), values.size() * sizeof(T)); } -void random_null_bitmap(int64_t n, double pct_null, uint8_t* null_bitmap) { +// Sets approximately pct_null of the first n bytes in null_bytes to zero +// and the rest to non-zero (true) values. +void random_null_bytes(int64_t n, double pct_null, uint8_t* null_bytes) { Random rng(random_seed()); for (int i = 0; i < n; ++i) { - null_bitmap[i] = rng.NextDoubleFraction() > pct_null; + null_bytes[i] = rng.NextDoubleFraction() > pct_null; } } @@ -121,6 +125,7 @@ static inline void random_bytes(int n, uint32_t seed, uint8_t* out) { template void rand_uniform_int(int n, uint32_t seed, T min_value, T max_value, T* out) { + DCHECK(out); std::mt19937 gen(seed); std::uniform_int_distribution d(min_value, max_value); for (int i = 0; i < n; ++i) { @@ -129,11 +134,25 @@ void rand_uniform_int(int n, uint32_t seed, T min_value, T max_value, T* out) { } static inline int bitmap_popcount(const uint8_t* data, int length) { + // book keeping + constexpr int pop_len = sizeof(uint64_t); + const uint64_t* i64_data = reinterpret_cast(data); + const int fast_counts = length / pop_len; + const uint64_t* end = i64_data + fast_counts; + int count = 0; - for (int i = 0; i < length; ++i) { - // TODO(wesm): accelerate this + // popcount as much as possible with the widest possible count + for (auto iter = i64_data; iter < end; ++iter) { + count += __builtin_popcountll(*iter); + } + + // Account for left over bytes (in theory we could fall back to smaller + // versions of popcount but the code complexity is likely not worth it) + const int loop_tail_index = fast_counts * pop_len; + for (int i = loop_tail_index; i < length; ++i) { if (util::get_bit(data, i)) { ++count; } } + return count; } @@ -153,6 +172,26 @@ std::shared_ptr bytes_to_null_buffer(const std::vector& bytes) return out; } +Status MakeRandomInt32PoolBuffer(int32_t length, MemoryPool* pool, + std::shared_ptr* pool_buffer, uint32_t seed = 0) { + DCHECK(pool); + auto data = std::make_shared(pool); + RETURN_NOT_OK(data->Resize(length * sizeof(int32_t))); + test::rand_uniform_int(length, seed, 0, std::numeric_limits::max(), + reinterpret_cast(data->mutable_data())); + *pool_buffer = data; + return Status::OK(); +} + +Status MakeRandomBytePoolBuffer(int32_t length, MemoryPool* pool, + std::shared_ptr* pool_buffer, uint32_t seed = 0) { + auto bytes = std::make_shared(pool); + RETURN_NOT_OK(bytes->Resize(length)); + test::random_bytes(length, seed, bytes->mutable_data()); + *pool_buffer = bytes; + return Status::OK(); +} + } // namespace test } // namespace arrow diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h index 051ab46b199f9..77404cd702524 100644 --- a/cpp/src/arrow/type.h +++ b/cpp/src/arrow/type.h @@ -116,7 +116,7 @@ struct DataType { bool Equals(const DataType* other) { // Call with a pointer so more friendly to subclasses - return this == other || (this->type == other->type); + return other && ((this == other) || (this->type == other->type)); } bool Equals(const std::shared_ptr& other) { return Equals(other.get()); } diff --git a/cpp/src/arrow/types/construct.cc b/cpp/src/arrow/types/construct.cc index 0a30929b97c51..78036d4bf5711 100644 --- a/cpp/src/arrow/types/construct.cc +++ b/cpp/src/arrow/types/construct.cc @@ -20,8 +20,8 @@ #include #include "arrow/type.h" -#include "arrow/types/primitive.h" #include "arrow/types/list.h" +#include "arrow/types/primitive.h" #include "arrow/types/string.h" #include "arrow/util/buffer.h" #include "arrow/util/status.h" @@ -60,11 +60,10 @@ Status MakeBuilder(MemoryPool* pool, const std::shared_ptr& type, case Type::LIST: { std::shared_ptr value_builder; - const std::shared_ptr& value_type = static_cast(type.get())->value_type(); RETURN_NOT_OK(MakeBuilder(pool, value_type, &value_builder)); - out->reset(new ListBuilder(pool, type, value_builder)); + out->reset(new ListBuilder(pool, value_builder)); return Status::OK(); } default: @@ -75,11 +74,11 @@ Status MakeBuilder(MemoryPool* pool, const std::shared_ptr& type, #define MAKE_PRIMITIVE_ARRAY_CASE(ENUM, ArrayType) \ case Type::ENUM: \ out->reset(new ArrayType(type, length, data, null_count, null_bitmap)); \ - return Status::OK(); + break; -Status MakePrimitiveArray(const std::shared_ptr& type, int32_t length, +Status MakePrimitiveArray(const TypePtr& type, int32_t length, const std::shared_ptr& data, int32_t null_count, - const std::shared_ptr& null_bitmap, std::shared_ptr* out) { + const std::shared_ptr& null_bitmap, ArrayPtr* out) { switch (type->type) { MAKE_PRIMITIVE_ARRAY_CASE(BOOL, BooleanArray); MAKE_PRIMITIVE_ARRAY_CASE(UINT8, UInt8Array); @@ -90,11 +89,43 @@ Status MakePrimitiveArray(const std::shared_ptr& type, int32_t length, MAKE_PRIMITIVE_ARRAY_CASE(INT32, Int32Array); MAKE_PRIMITIVE_ARRAY_CASE(UINT64, UInt64Array); MAKE_PRIMITIVE_ARRAY_CASE(INT64, Int64Array); + MAKE_PRIMITIVE_ARRAY_CASE(TIME, Int64Array); + MAKE_PRIMITIVE_ARRAY_CASE(TIMESTAMP, Int64Array); MAKE_PRIMITIVE_ARRAY_CASE(FLOAT, FloatArray); MAKE_PRIMITIVE_ARRAY_CASE(DOUBLE, DoubleArray); + MAKE_PRIMITIVE_ARRAY_CASE(TIMESTAMP_DOUBLE, DoubleArray); + default: + return Status::NotImplemented(type->ToString()); + } +#ifdef NDEBUG + return Status::OK(); +#else + return (*out)->Validate(); +#endif +} + +Status MakeListArray(const TypePtr& type, int32_t length, + const std::shared_ptr& offsets, const ArrayPtr& values, int32_t null_count, + const std::shared_ptr& null_bitmap, ArrayPtr* out) { + switch (type->type) { + case Type::BINARY: + case Type::LIST: + out->reset(new ListArray(type, length, offsets, values, null_count, null_bitmap)); + break; + case Type::CHAR: + case Type::DECIMAL_TEXT: + case Type::STRING: + case Type::VARCHAR: + out->reset(new StringArray(type, length, offsets, values, null_count, null_bitmap)); + break; default: return Status::NotImplemented(type->ToString()); } +#ifdef NDEBUG + return Status::OK(); +#else + return (*out)->Validate(); +#endif } } // namespace arrow diff --git a/cpp/src/arrow/types/construct.h b/cpp/src/arrow/types/construct.h index 27fb7bd2149cf..43c0018c67e41 100644 --- a/cpp/src/arrow/types/construct.h +++ b/cpp/src/arrow/types/construct.h @@ -33,10 +33,19 @@ class Status; Status MakeBuilder(MemoryPool* pool, const std::shared_ptr& type, std::shared_ptr* out); +// Create new arrays for logical types that are backed by primitive arrays. Status MakePrimitiveArray(const std::shared_ptr& type, int32_t length, const std::shared_ptr& data, int32_t null_count, const std::shared_ptr& null_bitmap, std::shared_ptr* out); +// Create new list arrays for logical types that are backed by ListArrays (e.g. list of +// primitives and strings) +// TODO(emkornfield) split up string vs list? +Status MakeListArray(const std::shared_ptr& type, int32_t length, + const std::shared_ptr& offests, const std::shared_ptr& values, + int32_t null_count, const std::shared_ptr& null_bitmap, + std::shared_ptr* out); + } // namespace arrow #endif // ARROW_BUILDER_H_ diff --git a/cpp/src/arrow/types/list-test.cc b/cpp/src/arrow/types/list-test.cc index aa34f23cc0230..6a8ad9aa59ead 100644 --- a/cpp/src/arrow/types/list-test.cc +++ b/cpp/src/arrow/types/list-test.cc @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -#include #include +#include #include #include #include @@ -94,6 +94,7 @@ TEST_F(TestListBuilder, TestAppendNull) { Done(); + ASSERT_OK(result_->Validate()); ASSERT_TRUE(result_->IsNull(0)); ASSERT_TRUE(result_->IsNull(1)); @@ -105,50 +106,93 @@ TEST_F(TestListBuilder, TestAppendNull) { ASSERT_EQ(0, values->length()); } +void ValidateBasicListArray(const ListArray* result, const vector& values, + const vector& is_valid) { + ASSERT_OK(result->Validate()); + ASSERT_EQ(1, result->null_count()); + ASSERT_EQ(0, result->values()->null_count()); + + ASSERT_EQ(3, result->length()); + vector ex_offsets = {0, 3, 3, 7}; + for (size_t i = 0; i < ex_offsets.size(); ++i) { + ASSERT_EQ(ex_offsets[i], result->offset(i)); + } + + for (int i = 0; i < result->length(); ++i) { + ASSERT_EQ(!static_cast(is_valid[i]), result->IsNull(i)); + } + + ASSERT_EQ(7, result->values()->length()); + Int32Array* varr = static_cast(result->values().get()); + + for (size_t i = 0; i < values.size(); ++i) { + ASSERT_EQ(values[i], varr->Value(i)); + } +} + TEST_F(TestListBuilder, TestBasics) { vector values = {0, 1, 2, 3, 4, 5, 6}; vector lengths = {3, 0, 4}; - vector is_null = {0, 1, 0}; + vector is_valid = {1, 0, 1}; Int32Builder* vb = static_cast(builder_->value_builder().get()); - EXPECT_OK(builder_->Reserve(lengths.size())); - EXPECT_OK(vb->Reserve(values.size())); + ASSERT_OK(builder_->Reserve(lengths.size())); + ASSERT_OK(vb->Reserve(values.size())); int pos = 0; for (size_t i = 0; i < lengths.size(); ++i) { - ASSERT_OK(builder_->Append(is_null[i] > 0)); + ASSERT_OK(builder_->Append(is_valid[i] > 0)); for (int j = 0; j < lengths[i]; ++j) { vb->Append(values[pos++]); } } Done(); + ValidateBasicListArray(result_.get(), values, is_valid); +} - ASSERT_EQ(1, result_->null_count()); - ASSERT_EQ(0, result_->values()->null_count()); +TEST_F(TestListBuilder, BulkAppend) { + vector values = {0, 1, 2, 3, 4, 5, 6}; + vector lengths = {3, 0, 4}; + vector is_valid = {1, 0, 1}; + vector offsets = {0, 3, 3}; - ASSERT_EQ(3, result_->length()); - vector ex_offsets = {0, 3, 3, 7}; - for (size_t i = 0; i < ex_offsets.size(); ++i) { - ASSERT_EQ(ex_offsets[i], result_->offset(i)); - } + Int32Builder* vb = static_cast(builder_->value_builder().get()); + ASSERT_OK(vb->Reserve(values.size())); - for (int i = 0; i < result_->length(); ++i) { - ASSERT_EQ(static_cast(is_null[i]), result_->IsNull(i)); + builder_->Append(offsets.data(), offsets.size(), is_valid.data()); + for (int32_t value : values) { + vb->Append(value); } + Done(); + ValidateBasicListArray(result_.get(), values, is_valid); +} - ASSERT_EQ(7, result_->values()->length()); - Int32Array* varr = static_cast(result_->values().get()); +TEST_F(TestListBuilder, BulkAppendInvalid) { + vector values = {0, 1, 2, 3, 4, 5, 6}; + vector lengths = {3, 0, 4}; + vector is_null = {0, 1, 0}; + vector is_valid = {1, 0, 1}; + vector offsets = {0, 2, 4}; // should be 0, 3, 3 given the is_null array - for (size_t i = 0; i < values.size(); ++i) { - ASSERT_EQ(values[i], varr->Value(i)); + Int32Builder* vb = static_cast(builder_->value_builder().get()); + ASSERT_OK(vb->Reserve(values.size())); + + builder_->Append(offsets.data(), offsets.size(), is_valid.data()); + builder_->Append(offsets.data(), offsets.size(), is_valid.data()); + for (int32_t value : values) { + vb->Append(value); } + + Done(); + ASSERT_RAISES(Invalid, result_->Validate()); } TEST_F(TestListBuilder, TestZeroLength) { // All buffers are null Done(); + ASSERT_OK(result_->Validate()); } } // namespace arrow diff --git a/cpp/src/arrow/types/list.cc b/cpp/src/arrow/types/list.cc index 23f12ddc4ecd7..fc3331139c6d8 100644 --- a/cpp/src/arrow/types/list.cc +++ b/cpp/src/arrow/types/list.cc @@ -14,23 +14,26 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. - #include "arrow/types/list.h" +#include + namespace arrow { bool ListArray::EqualsExact(const ListArray& other) const { if (this == &other) { return true; } if (null_count_ != other.null_count_) { return false; } - bool equal_offsets = offset_buf_->Equals(*other.offset_buf_, length_ + 1); + bool equal_offsets = + offset_buf_->Equals(*other.offset_buf_, (length_ + 1) * sizeof(int32_t)); + if (!equal_offsets) { return false; } bool equal_null_bitmap = true; if (null_count_ > 0) { equal_null_bitmap = null_bitmap_->Equals(*other.null_bitmap_, util::bytes_for_bits(length_)); } - if (!(equal_offsets && equal_null_bitmap)) { return false; } + if (!equal_null_bitmap) { return false; } return values()->Equals(other.values()); } @@ -41,4 +44,55 @@ bool ListArray::Equals(const std::shared_ptr& arr) const { return EqualsExact(*static_cast(arr.get())); } +Status ListArray::Validate() const { + if (length_ < 0) { return Status::Invalid("Length was negative"); } + if (!offset_buf_) { return Status::Invalid("offset_buf_ was null"); } + if (offset_buf_->size() / sizeof(int32_t) < length_) { + std::stringstream ss; + ss << "offset buffer size (bytes): " << offset_buf_->size() + << " isn't large enough for length: " << length_; + return Status::Invalid(ss.str()); + } + const int32_t last_offset = offset(length_); + if (last_offset > 0) { + if (!values_) { + return Status::Invalid("last offset was non-zero and values was null"); + } + if (values_->length() != last_offset) { + std::stringstream ss; + ss << "Final offset invariant not equal to values length: " << last_offset + << "!=" << values_->length(); + return Status::Invalid(ss.str()); + } + + const Status child_valid = values_->Validate(); + if (!child_valid.ok()) { + std::stringstream ss; + ss << "Child array invalid: " << child_valid.ToString(); + return Status::Invalid(ss.str()); + } + } + + int32_t prev_offset = offset(0); + if (prev_offset != 0) { return Status::Invalid("The first offset wasn't zero"); } + for (int32_t i = 1; i <= length_; ++i) { + int32_t current_offset = offset(i); + if (IsNull(i - 1) && current_offset != prev_offset) { + std::stringstream ss; + ss << "Offset invariant failure at: " << i << " inconsistent offsets for null slot" + << current_offset << "!=" << prev_offset; + return Status::Invalid(ss.str()); + } + if (current_offset < prev_offset) { + std::stringstream ss; + ss << "Offset invariant failure: " << i + << " inconsistent offset for non-null slot: " << current_offset << "<" + << prev_offset; + return Status::Invalid(ss.str()); + } + prev_offset = current_offset; + } + return Status::OK(); +} + } // namespace arrow diff --git a/cpp/src/arrow/types/list.h b/cpp/src/arrow/types/list.h index 6b815460ecb1e..e2302d917b8f6 100644 --- a/cpp/src/arrow/types/list.h +++ b/cpp/src/arrow/types/list.h @@ -28,6 +28,7 @@ #include "arrow/types/primitive.h" #include "arrow/util/bit-util.h" #include "arrow/util/buffer.h" +#include "arrow/util/logging.h" #include "arrow/util/status.h" namespace arrow { @@ -46,11 +47,16 @@ class ListArray : public Array { values_ = values; } - virtual ~ListArray() {} + Status Validate() const override; + + virtual ~ListArray() = default; // Return a shared pointer in case the requestor desires to share ownership // with this array. const std::shared_ptr& values() const { return values_; } + const std::shared_ptr offset_buffer() const { + return std::static_pointer_cast(offset_buf_); + } const std::shared_ptr& value_type() const { return values_->type(); } @@ -78,59 +84,73 @@ class ListArray : public Array { // // To use this class, you must append values to the child array builder and use // the Append function to delimit each distinct list value (once the values -// have been appended to the child array) -class ListBuilder : public Int32Builder { +// have been appended to the child array) or use the bulk API to append +// a sequence of offests and null values. +// +// A note on types. Per arrow/type.h all types in the c++ implementation are +// logical so even though this class always builds an Array of lists, this can +// represent multiple different logical types. If no logical type is provided +// at construction time, the class defaults to List where t is take from the +// value_builder/values that the object is constructed with. +class ListBuilder : public ArrayBuilder { public: + // Use this constructor to incrementally build the value array along with offsets and + // null bitmap. + ListBuilder(MemoryPool* pool, std::shared_ptr value_builder, + const TypePtr& type = nullptr) + : ArrayBuilder( + pool, type ? type : std::static_pointer_cast( + std::make_shared(value_builder->type()))), + offset_builder_(pool), + value_builder_(value_builder) {} + + // Use this constructor to build the list with a pre-existing values array ListBuilder( - MemoryPool* pool, const TypePtr& type, std::shared_ptr value_builder) - : Int32Builder(pool, type), value_builder_(value_builder) {} - - Status Init(int32_t elements) { - // One more than requested. - // - // XXX: This is slightly imprecise, because we might trigger null mask - // resizes that are unnecessary when creating arrays with power-of-two size - return Int32Builder::Init(elements + 1); + MemoryPool* pool, std::shared_ptr values, const TypePtr& type = nullptr) + : ArrayBuilder(pool, type ? type : std::static_pointer_cast( + std::make_shared(values->type()))), + offset_builder_(pool), + values_(values) {} + + Status Init(int32_t elements) override { + RETURN_NOT_OK(ArrayBuilder::Init(elements)); + // one more then requested for offsets + return offset_builder_.Resize((elements + 1) * sizeof(int32_t)); } - Status Resize(int32_t capacity) { - // Need space for the end offset - RETURN_NOT_OK(Int32Builder::Resize(capacity + 1)); - - // Slight hack, as the "real" capacity is one less - --capacity_; - return Status::OK(); + Status Resize(int32_t capacity) override { + // one more then requested for offsets + RETURN_NOT_OK(offset_builder_.Resize((capacity + 1) * sizeof(int32_t))); + return ArrayBuilder::Resize(capacity); } // Vector append // // If passed, valid_bytes is of equal length to values, and any zero byte // will be considered as a null for that slot - Status Append(value_type* values, int32_t length, uint8_t* valid_bytes = nullptr) { - if (length_ + length > capacity_) { - int32_t new_capacity = util::next_power2(length_ + length); - RETURN_NOT_OK(Resize(new_capacity)); - } - memcpy(raw_data_ + length_, values, type_traits::bytes_required(length)); - - if (valid_bytes != nullptr) { AppendNulls(valid_bytes, length); } - - length_ += length; + Status Append( + const int32_t* offsets, int32_t length, const uint8_t* valid_bytes = nullptr) { + RETURN_NOT_OK(Reserve(length)); + UnsafeAppendToBitmap(valid_bytes, length); + offset_builder_.UnsafeAppend(offsets, length); return Status::OK(); } + // The same as Finalize but allows for overridding the c++ type template std::shared_ptr Transfer() { - std::shared_ptr items = value_builder_->Finish(); + std::shared_ptr items = values_; + if (!items) { items = value_builder_->Finish(); } - // Add final offset if the length is non-zero - if (length_) { raw_data_[length_] = items->length(); } + offset_builder_.Append(items->length()); + const auto offsets_buffer = offset_builder_.Finish(); auto result = std::make_shared( - type_, length_, data_, items, null_count_, null_bitmap_); + type_, length_, offsets_buffer, items, null_count_, null_bitmap_); - data_ = null_bitmap_ = nullptr; + // TODO(emkornfield) make a reset method capacity_ = length_ = null_count_ = 0; + null_bitmap_ = nullptr; return result; } @@ -141,26 +161,24 @@ class ListBuilder : public Int32Builder { // // This function should be called before beginning to append elements to the // value builder - Status Append(bool is_null = false) { - if (length_ == capacity_) { - // If the capacity was not already a multiple of 2, do so here - RETURN_NOT_OK(Resize(util::next_power2(capacity_ + 1))); - } - if (is_null) { - ++null_count_; - } else { - util::set_bit(null_bitmap_data_, length_); - } - raw_data_[length_++] = value_builder_->length(); + Status Append(bool is_valid = true) { + RETURN_NOT_OK(Reserve(1)); + UnsafeAppendToBitmap(is_valid); + RETURN_NOT_OK(offset_builder_.Append(value_builder_->length())); return Status::OK(); } - Status AppendNull() { return Append(true); } + Status AppendNull() { return Append(false); } - const std::shared_ptr& value_builder() const { return value_builder_; } + const std::shared_ptr& value_builder() const { + DCHECK(!values_) << "Using value builder is pointless when values_ is set"; + return value_builder_; + } protected: + BufferBuilder offset_builder_; std::shared_ptr value_builder_; + std::shared_ptr values_; }; } // namespace arrow diff --git a/cpp/src/arrow/types/primitive-test.cc b/cpp/src/arrow/types/primitive-test.cc index 6bd9e73eb46ac..2b4c0879a28f4 100644 --- a/cpp/src/arrow/types/primitive-test.cc +++ b/cpp/src/arrow/types/primitive-test.cc @@ -102,7 +102,7 @@ class TestPrimitiveBuilder : public TestBuilder { Attrs::draw(N, &draws_); valid_bytes_.resize(N); - test::random_null_bitmap(N, pct_null, valid_bytes_.data()); + test::random_null_bytes(N, pct_null, valid_bytes_.data()); } void Check(const std::shared_ptr& builder, bool nullable) { @@ -193,8 +193,8 @@ void TestPrimitiveBuilder::RandomData(int N, double pct_null) { draws_.resize(N); valid_bytes_.resize(N); - test::random_null_bitmap(N, 0.5, draws_.data()); - test::random_null_bitmap(N, pct_null, valid_bytes_.data()); + test::random_null_bytes(N, 0.5, draws_.data()); + test::random_null_bytes(N, pct_null, valid_bytes_.data()); } template <> diff --git a/cpp/src/arrow/types/primitive.cc b/cpp/src/arrow/types/primitive.cc index 9549c47b41157..9102c530e25da 100644 --- a/cpp/src/arrow/types/primitive.cc +++ b/cpp/src/arrow/types/primitive.cc @@ -57,12 +57,14 @@ bool PrimitiveArray::EqualsExact(const PrimitiveArray& other) const { } return true; } else { + if (length_ == 0 && other.length_ == 0) { return true; } return data_->Equals(*other.data_, length_); } } bool PrimitiveArray::Equals(const std::shared_ptr& arr) const { if (this == arr.get()) { return true; } + if (!arr) { return false; } if (this->type_enum() != arr->type_enum()) { return false; } return EqualsExact(*static_cast(arr.get())); } @@ -101,48 +103,21 @@ Status PrimitiveBuilder::Resize(int32_t capacity) { return Status::OK(); } -template -Status PrimitiveBuilder::Reserve(int32_t elements) { - if (length_ + elements > capacity_) { - int32_t new_capacity = util::next_power2(length_ + elements); - return Resize(new_capacity); - } - return Status::OK(); -} - template Status PrimitiveBuilder::Append( const value_type* values, int32_t length, const uint8_t* valid_bytes) { - RETURN_NOT_OK(PrimitiveBuilder::Reserve(length)); + RETURN_NOT_OK(Reserve(length)); if (length > 0) { memcpy(raw_data_ + length_, values, type_traits::bytes_required(length)); } - if (valid_bytes != nullptr) { - PrimitiveBuilder::AppendNulls(valid_bytes, length); - } else { - for (int i = 0; i < length; ++i) { - util::set_bit(null_bitmap_data_, length_ + i); - } - } + // length_ is update by these + ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, length); - length_ += length; return Status::OK(); } -template -void PrimitiveBuilder::AppendNulls(const uint8_t* valid_bytes, int32_t length) { - // If valid_bytes is all not null, then none of the values are null - for (int i = 0; i < length; ++i) { - if (valid_bytes[i] == 0) { - ++null_count_; - } else { - util::set_bit(null_bitmap_data_, length_ + i); - } - } -} - template std::shared_ptr PrimitiveBuilder::Finish() { std::shared_ptr result = std::make_shared::ArrayType>( @@ -166,14 +141,8 @@ Status PrimitiveBuilder::Append( } } - if (valid_bytes != nullptr) { - PrimitiveBuilder::AppendNulls(valid_bytes, length); - } else { - for (int i = 0; i < length; ++i) { - util::set_bit(null_bitmap_data_, length_ + i); - } - } - length_ += length; + // this updates length_ + ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, length); return Status::OK(); } diff --git a/cpp/src/arrow/types/primitive.h b/cpp/src/arrow/types/primitive.h index fcd3db4e96e53..6f6b2fed5a320 100644 --- a/cpp/src/arrow/types/primitive.h +++ b/cpp/src/arrow/types/primitive.h @@ -95,15 +95,13 @@ class PrimitiveBuilder : public ArrayBuilder { using ArrayBuilder::Advance; // Write nulls as uint8_t* (0 value indicates null) into pre-allocated memory - void AppendNulls(const uint8_t* valid_bytes, int32_t length); + void AppendNulls(const uint8_t* valid_bytes, int32_t length) { + UnsafeAppendToBitmap(valid_bytes, length); + } Status AppendNull() { - if (length_ == capacity_) { - // If the capacity was not already a multiple of 2, do so here - RETURN_NOT_OK(Resize(util::next_power2(capacity_ + 1))); - } - ++null_count_; - ++length_; + RETURN_NOT_OK(Reserve(1)); + UnsafeAppendToBitmap(false); return Status::OK(); } @@ -116,21 +114,17 @@ class PrimitiveBuilder : public ArrayBuilder { Status Append( const value_type* values, int32_t length, const uint8_t* valid_bytes = nullptr); - // Ensure that builder can accommodate an additional number of - // elements. Resizes if the current capacity is not sufficient - Status Reserve(int32_t elements); - std::shared_ptr Finish() override; - protected: - std::shared_ptr data_; - value_type* raw_data_; - - Status Init(int32_t capacity); + Status Init(int32_t capacity) override; // Increase the capacity of the builder to accommodate at least the indicated // number of elements - Status Resize(int32_t capacity); + Status Resize(int32_t capacity) override; + + protected: + std::shared_ptr data_; + value_type* raw_data_; }; template @@ -140,9 +134,17 @@ class NumericBuilder : public PrimitiveBuilder { using PrimitiveBuilder::PrimitiveBuilder; using PrimitiveBuilder::Append; + using PrimitiveBuilder::Init; + using PrimitiveBuilder::Resize; - // Scalar append. Does not capacity-check; make sure to call Reserve beforehand + // Scalar append. void Append(value_type val) { + ArrayBuilder::Reserve(1); + UnsafeAppend(val); + } + + // Does not capacity-check; make sure to call Reserve beforehand + void UnsafeAppend(value_type val) { util::set_bit(null_bitmap_data_, length_); raw_data_[length_++] = val; } @@ -151,9 +153,6 @@ class NumericBuilder : public PrimitiveBuilder { using PrimitiveBuilder::length_; using PrimitiveBuilder::null_bitmap_data_; using PrimitiveBuilder::raw_data_; - - using PrimitiveBuilder::Init; - using PrimitiveBuilder::Resize; }; template <> diff --git a/cpp/src/arrow/types/string.h b/cpp/src/arrow/types/string.h index c5cbe1058c7cf..d2d3c5b6b5a83 100644 --- a/cpp/src/arrow/types/string.h +++ b/cpp/src/arrow/types/string.h @@ -89,11 +89,11 @@ class StringArray : public ListArray { const uint8_t* raw_bytes_; }; -// Array builder +// String builder class StringBuilder : public ListBuilder { public: explicit StringBuilder(MemoryPool* pool, const TypePtr& type) - : ListBuilder(pool, type, std::make_shared(pool, value_type_)) { + : ListBuilder(pool, std::make_shared(pool, value_type_), type) { byte_builder_ = static_cast(value_builder_.get()); } @@ -110,7 +110,6 @@ class StringBuilder : public ListBuilder { } protected: - std::shared_ptr list_builder_; UInt8Builder* byte_builder_; static TypePtr value_type_; diff --git a/cpp/src/arrow/util/buffer.h b/cpp/src/arrow/util/buffer.h index 56532be8070ae..5ef0076953cea 100644 --- a/cpp/src/arrow/util/buffer.h +++ b/cpp/src/arrow/util/buffer.h @@ -23,6 +23,7 @@ #include #include +#include "arrow/util/bit-util.h" #include "arrow/util/macros.h" #include "arrow/util/status.h" @@ -137,26 +138,64 @@ class BufferBuilder { public: explicit BufferBuilder(MemoryPool* pool) : pool_(pool), capacity_(0), size_(0) {} + Status Resize(int32_t elements) { + if (capacity_ == 0) { buffer_ = std::make_shared(pool_); } + capacity_ = elements; + RETURN_NOT_OK(buffer_->Resize(capacity_)); + data_ = buffer_->mutable_data(); + return Status::OK(); + } + Status Append(const uint8_t* data, int length) { - if (capacity_ < length + size_) { - if (capacity_ == 0) { buffer_ = std::make_shared(pool_); } - capacity_ = std::max(MIN_BUFFER_CAPACITY, capacity_); - while (capacity_ < length + size_) { - capacity_ *= 2; - } - RETURN_NOT_OK(buffer_->Resize(capacity_)); - data_ = buffer_->mutable_data(); - } + if (capacity_ < length + size_) { RETURN_NOT_OK(Resize(length + size_)); } + UnsafeAppend(data, length); + return Status::OK(); + } + + template + Status Append(T arithmetic_value) { + static_assert(std::is_arithmetic::value, + "Convenience buffer append only supports arithmetic types"); + return Append(reinterpret_cast(&arithmetic_value), sizeof(T)); + } + + template + Status Append(const T* arithmetic_values, int num_elements) { + static_assert(std::is_arithmetic::value, + "Convenience buffer append only supports arithmetic types"); + return Append( + reinterpret_cast(arithmetic_values), num_elements * sizeof(T)); + } + + // Unsafe methods don't check existing size + void UnsafeAppend(const uint8_t* data, int length) { memcpy(data_ + size_, data, length); size_ += length; - return Status::OK(); + } + + template + void UnsafeAppend(T arithmetic_value) { + static_assert(std::is_arithmetic::value, + "Convenience buffer append only supports arithmetic types"); + UnsafeAppend(reinterpret_cast(&arithmetic_value), sizeof(T)); + } + + template + void UnsafeAppend(const T* arithmetic_values, int num_elements) { + static_assert(std::is_arithmetic::value, + "Convenience buffer append only supports arithmetic types"); + UnsafeAppend( + reinterpret_cast(arithmetic_values), num_elements * sizeof(T)); } std::shared_ptr Finish() { auto result = buffer_; buffer_ = nullptr; + capacity_ = size_ = 0; return result; } + int capacity() { return capacity_; } + int length() { return size_; } private: std::shared_ptr buffer_; diff --git a/cpp/src/arrow/util/logging.h b/cpp/src/arrow/util/logging.h index 527ce423e7751..fccc5e3085de5 100644 --- a/cpp/src/arrow/util/logging.h +++ b/cpp/src/arrow/util/logging.h @@ -18,8 +18,8 @@ #ifndef ARROW_UTIL_LOGGING_H #define ARROW_UTIL_LOGGING_H -#include #include +#include namespace arrow { diff --git a/cpp/src/arrow/util/memory-pool.cc b/cpp/src/arrow/util/memory-pool.cc index fb417e74daf53..961554fe06bcc 100644 --- a/cpp/src/arrow/util/memory-pool.cc +++ b/cpp/src/arrow/util/memory-pool.cc @@ -18,8 +18,8 @@ #include "arrow/util/memory-pool.h" #include -#include #include +#include #include "arrow/util/status.h" diff --git a/python/pyarrow/tests/test_array.py b/python/pyarrow/tests/test_array.py index d608f8167df65..bf5a22089cdba 100644 --- a/python/pyarrow/tests/test_array.py +++ b/python/pyarrow/tests/test_array.py @@ -31,14 +31,15 @@ def test_getitem_NA(self): assert arr[1] is pyarrow.NA def test_list_format(self): - arr = pyarrow.from_pylist([[1], None, [2, 3]]) + arr = pyarrow.from_pylist([[1], None, [2, 3, None]]) result = fmt.array_format(arr) expected = """\ [ [1], NA, [2, - 3] + 3, + NA] ]""" assert result == expected