Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-185: Make padding and alignment for all buffers be 64 bytes #74

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions cpp/src/arrow/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ Status ArrayBuilder::AppendToBitmap(const uint8_t* valid_bytes, int32_t length)
}

Status ArrayBuilder::Init(int32_t capacity) {
capacity_ = capacity;
int32_t to_alloc = util::ceil_byte(capacity) / 8;
null_bitmap_ = std::make_shared<PoolBuffer>(pool_);
RETURN_NOT_OK(null_bitmap_->Resize(to_alloc));
// Buffers might allocate more then necessary to satisfy padding requirements
const int byte_capacity = null_bitmap_->capacity();
capacity_ = capacity;
null_bitmap_data_ = null_bitmap_->mutable_data();
memset(null_bitmap_data_, 0, to_alloc);
memset(null_bitmap_data_, 0, byte_capacity);
return Status::OK();
}

Expand All @@ -60,8 +62,11 @@ Status ArrayBuilder::Resize(int32_t new_bits) {
int32_t old_bytes = null_bitmap_->size();
RETURN_NOT_OK(null_bitmap_->Resize(new_bytes));
null_bitmap_data_ = null_bitmap_->mutable_data();
// The buffer might be overpadded to deal with padding according to the spec
const int32_t byte_capacity = null_bitmap_->capacity();
capacity_ = new_bits;
if (old_bytes < new_bytes) {
memset(null_bitmap_data_ + old_bytes, 0, new_bytes - old_bytes);
memset(null_bitmap_data_ + old_bytes, 0, byte_capacity - old_bytes);
}
return Status::OK();
}
Expand Down
10 changes: 9 additions & 1 deletion cpp/src/arrow/ipc/adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ Status VisitArray(const Array* arr, std::vector<flatbuf::FieldNode>* field_nodes
} else if (arr->type_enum() == Type::STRUCT) {
// TODO(wesm)
return Status::NotImplemented("Struct type");
} else {
return Status::NotImplemented("Unrecognized type");
}
return Status::OK();
}
Expand Down Expand Up @@ -142,7 +144,13 @@ class RowBatchWriter {
int64_t size = 0;

// The buffer might be null if we are handling zero row lengths.
if (buffer) { size = buffer->size(); }
if (buffer) {
// We use capacity here, because size might not reflect the padding
// requirements of buffers but capacity always should.
size = buffer->capacity();
// check that padding is appropriate
DCHECK_EQ(size % 64, 0);
}
// TODO(wesm): We currently have no notion of shared memory page id's,
// but we've included it in the metadata IDL for when we have it in the
// future. Use page=0 for now
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/ipc/ipc-adapter-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ INSTANTIATE_TEST_CASE_P(RoundTripTests, TestWriteRowBatch,

void TestGetRowBatchSize(std::shared_ptr<RowBatch> batch) {
MockMemorySource mock_source(1 << 16);
int64_t mock_header_location;
int64_t size;
int64_t mock_header_location = -1;
int64_t size = -1;
ASSERT_OK(WriteRowBatch(&mock_source, batch.get(), 0, &mock_header_location));
ASSERT_OK(GetRowBatchSize(batch.get(), &size));
ASSERT_EQ(mock_source.GetExtentBytesWritten(), size);
Expand Down Expand Up @@ -270,7 +270,7 @@ TEST_F(RecursionLimits, WriteLimit) {
}

TEST_F(RecursionLimits, ReadLimit) {
int64_t header_location;
int64_t header_location = -1;
std::shared_ptr<Schema> schema;
ASSERT_OK(WriteToMmap(64, true, &header_location, &schema));

Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/types/list.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <cstdint>
#include <cstring>
#include <limits>
#include <memory>

#include "arrow/array.h"
Expand Down Expand Up @@ -113,12 +114,14 @@ class ListBuilder : public ArrayBuilder {
values_(values) {}

Status Init(int32_t elements) override {
DCHECK_LT(elements, std::numeric_limits<int32_t>::max());
RETURN_NOT_OK(ArrayBuilder::Init(elements));
// one more then requested for offsets
return offset_builder_.Resize((elements + 1) * sizeof(int32_t));
}

Status Resize(int32_t capacity) override {
DCHECK_LT(capacity, std::numeric_limits<int32_t>::max());
// one more then requested for offsets
RETURN_NOT_OK(offset_builder_.Resize((capacity + 1) * sizeof(int32_t)));
return ArrayBuilder::Resize(capacity);
Expand Down
7 changes: 1 addition & 6 deletions cpp/src/arrow/types/primitive.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ Status PrimitiveBuilder<T>::Init(int32_t capacity) {

int64_t nbytes = type_traits<T>::bytes_required(capacity);
RETURN_NOT_OK(data_->Resize(nbytes));
memset(data_->mutable_data(), 0, nbytes);

raw_data_ = reinterpret_cast<value_type*>(data_->mutable_data());
return Status::OK();
Expand All @@ -92,14 +91,10 @@ Status PrimitiveBuilder<T>::Resize(int32_t capacity) {
} else {
RETURN_NOT_OK(ArrayBuilder::Resize(capacity));

int64_t old_bytes = data_->size();
int64_t new_bytes = type_traits<T>::bytes_required(capacity);
const int64_t new_bytes = type_traits<T>::bytes_required(capacity);
RETURN_NOT_OK(data_->Resize(new_bytes));
raw_data_ = reinterpret_cast<value_type*>(data_->mutable_data());

memset(data_->mutable_data() + old_bytes, 0, new_bytes - old_bytes);
}
capacity_ = capacity;
return Status::OK();
}

Expand Down
17 changes: 17 additions & 0 deletions cpp/src/arrow/util/buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,31 @@

#include <cstdint>

#include "arrow/util/logging.h"
#include "arrow/util/memory-pool.h"
#include "arrow/util/status.h"

namespace arrow {

namespace {
int64_t RoundUpToMultipleOf64(int64_t num) {
DCHECK_GE(num, 0);
constexpr int64_t round_to = 64;
constexpr int64_t multiple_bitmask = round_to - 1;
int64_t remainder = num & multiple_bitmask;
int rounded = num;
if (remainder) { rounded += 64 - remainder; }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should use round_to here. I'm also pretty sure there is something clever we could do to avoid the condition here, but at the moment I'm blanking on it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this do it?

(num + multiple_bitmask) & ~multiple_bitmask

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that looks right to me. although the performance gains are probably moot given the other condition for overflow.

// handle overflow. This should result in a malloc error upstream
if (rounded > 0) { num = rounded; }
return num;
}
} // namespace

Buffer::Buffer(const std::shared_ptr<Buffer>& parent, int64_t offset, int64_t size) {
data_ = parent->data() + offset;
size_ = size;
parent_ = parent;
capacity_ = size;
}

Buffer::~Buffer() {}
Expand All @@ -48,6 +64,7 @@ PoolBuffer::~PoolBuffer() {
Status PoolBuffer::Reserve(int64_t new_capacity) {
if (!mutable_data_ || new_capacity > capacity_) {
uint8_t* new_data;
new_capacity = RoundUpToMultipleOf64(new_capacity);
if (mutable_data_) {
RETURN_NOT_OK(pool_->Allocate(new_capacity, &new_data));
memcpy(new_data, mutable_data_, size_);
Expand Down
25 changes: 15 additions & 10 deletions cpp/src/arrow/util/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,20 @@ class Status;
// Buffer classes

// Immutable API for a chunk of bytes which may or may not be owned by the
// class instance
// class instance. Buffers have two related notions of length: size and
// capacity. Size is the number of bytes that might have valid data.
// Capacity is the number of bytes that where allocated for the buffer in
// total.
// The following invariant is always true: Size < Capacity
class Buffer : public std::enable_shared_from_this<Buffer> {
public:
Buffer(const uint8_t* data, int64_t size) : data_(data), size_(size) {}
Buffer(const uint8_t* data, int64_t size) : data_(data), size_(size), capacity_(size) {}
virtual ~Buffer();

// An offset into data that is owned by another buffer, but we want to be
// able to retain a valid pointer to it even after other shared_ptr's to the
// parent buffer have been destroyed
// TODO(emkornfield) how will this play with 64 byte alignment/padding?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inevitably alignment and padding isn't always going to be a guarantee on in-memory data (of course when data is moved for IPC purposes, that will need to be guaranteed). I suppose then that buffers will need to be able to communicate their alignment/padding for algorithm selection (i.e. can we use the spiffy AVX512 function or not?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to see how use-cases play out. It seems given the current spec, most slicing operations in the general case will need memory allocation anyways. We could likely guarantee alignment/padding by providing a utility method that either allocates slices if it can keep the contract otherwise allocates new underlying data. For now I will put a warning here.

Buffer(const std::shared_ptr<Buffer>& parent, int64_t offset, int64_t size);

std::shared_ptr<Buffer> get_shared_ptr() { return shared_from_this(); }
Expand All @@ -63,6 +68,7 @@ class Buffer : public std::enable_shared_from_this<Buffer> {
(data_ == other.data_ || !memcmp(data_, other.data_, size_)));
}

int64_t capacity() const { return capacity_; }
const uint8_t* data() const { return data_; }

int64_t size() const { return size_; }
Expand All @@ -76,6 +82,7 @@ class Buffer : public std::enable_shared_from_this<Buffer> {
protected:
const uint8_t* data_;
int64_t size_;
int64_t capacity_;

// nullptr by default, but may be set
std::shared_ptr<Buffer> parent_;
Expand Down Expand Up @@ -113,10 +120,7 @@ class ResizableBuffer : public MutableBuffer {
virtual Status Reserve(int64_t new_capacity) = 0;

protected:
ResizableBuffer(uint8_t* data, int64_t size)
: MutableBuffer(data, size), capacity_(size) {}

int64_t capacity_;
ResizableBuffer(uint8_t* data, int64_t size) : MutableBuffer(data, size) {}
};

// A Buffer whose lifetime is tied to a particular MemoryPool
Expand All @@ -125,8 +129,8 @@ class PoolBuffer : public ResizableBuffer {
explicit PoolBuffer(MemoryPool* pool = nullptr);
virtual ~PoolBuffer();

virtual Status Resize(int64_t new_size);
virtual Status Reserve(int64_t new_capacity);
Status Resize(int64_t new_size) override;
Status Reserve(int64_t new_capacity) override;

private:
MemoryPool* pool_;
Expand All @@ -138,10 +142,11 @@ class BufferBuilder {
public:
explicit BufferBuilder(MemoryPool* pool) : pool_(pool), capacity_(0), size_(0) {}

// Resizes the buffer to the nearest multiple of 64 bytes per Layout.md
Status Resize(int32_t elements) {
if (capacity_ == 0) { buffer_ = std::make_shared<PoolBuffer>(pool_); }
capacity_ = elements;
RETURN_NOT_OK(buffer_->Resize(capacity_));
RETURN_NOT_OK(buffer_->Resize(elements));
capacity_ = buffer_->capacity();
data_ = buffer_->mutable_data();
return Status::OK();
}
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/util/memory-pool-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ TEST(DefaultMemoryPool, MemoryTracking) {

uint8_t* data;
ASSERT_OK(pool->Allocate(100, &data));
EXPECT_EQ(0, reinterpret_cast<uint64_t>(data) % 64);
ASSERT_EQ(100, pool->bytes_allocated());

pool->Free(data, 100);
Expand Down
13 changes: 11 additions & 2 deletions cpp/src/arrow/util/memory-pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "arrow/util/memory-pool.h"

#include <stdlib.h>
#include <cstdlib>
#include <mutex>
#include <sstream>
Expand Down Expand Up @@ -44,14 +45,22 @@ class InternalMemoryPool : public MemoryPool {
};

Status InternalMemoryPool::Allocate(int64_t size, uint8_t** out) {
constexpr size_t kAlignment = 64;
std::lock_guard<std::mutex> guard(pool_lock_);
*out = static_cast<uint8_t*>(std::malloc(size));
if (*out == nullptr) {
// TODO(emkornfield) find something compatible with windows
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like _aligned_malloc is the thing on windows https://msdn.microsoft.com/en-us/library/8z34s9c6.aspx. since these have different APIs (the windows API sets errno on failure) we should probably factor the aligned allocation out into a utility function that returns Status.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

speaking of windows, I wonder if we can find a kind soul to set up Appveyor CI for this repo and get the Windows C++ build passing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep. I'll do the refactoring but I won't add the windows code, because I don't have anyway of testing it. I'll open a jira to add windows support via appveyor

const int result = posix_memalign(reinterpret_cast<void**>(out), kAlignment, size);
if (result == ENOMEM) {
std::stringstream ss;
ss << "malloc of size " << size << " failed";
return Status::OutOfMemory(ss.str());
}

if (result == EINVAL) {
std::stringstream ss;
ss << "invalid alignment parameter: " << kAlignment;
return Status::Invalid(ss.str());
}

bytes_allocated_ += size;

return Status::OK();
Expand Down