Skip to content

Commit

Permalink
ARROW-100: [C++] Computing RowBatch size
Browse files Browse the repository at this point in the history
Implement RowBatchWriter::DataHeaderSize and arrow::ipc::GetRowBatchSize. To achieve this, the Flatbuffer metadata is written to a temporary buffer and its size is determined. This commit also adds MockMemorySource, a new MemorySource that tracks the amount of memory written.

Author: Philipp Moritz <pcmoritz@gmail.com>

Author: Philipp Moritz <pcmoritz@gmail.com>

Closes #61 from pcmoritz/rowbatchsize and squashes the following commits:

e95fc5c [Philipp Moritz] fix formating
253c9f0 [Philipp Moritz] rename MockMemorySource methods to reflect better what they are doing
3484458 [Philipp Moritz] add tests for more datatypes
6b798f8 [Philipp Moritz] fix maximum recursion depth
67af8e1 [Philipp Moritz] merge GetRowBatchSize
9b69f12 [Philipp Moritz] factor out GetRowBatchSize test, use MockMemorySource to implement GetRowBatchSize, unify DataHeaderSize and TotalBytes into GetTotalSize
aa48cdf [Philipp Moritz] ARROW-100: [C++] Computing RowBatch size
  • Loading branch information
pcmoritz authored and wesm committed Apr 23, 2016
1 parent 0b472d8 commit a541644
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 15 deletions.
29 changes: 15 additions & 14 deletions cpp/src/arrow/ipc/adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,20 +179,13 @@ class RowBatchWriter {
}

// This must be called after invoking AssemblePayload
int64_t DataHeaderSize() {
// TODO(wesm): In case it is needed, compute the upper bound for the size
// of the buffer containing the flatbuffer data header.
return 0;
}

// Total footprint of buffers. This must be called after invoking
// AssemblePayload
int64_t TotalBytes() {
int64_t total = 0;
for (const std::shared_ptr<Buffer>& buffer : buffers_) {
total += buffer->size();
}
return total;
Status GetTotalSize(int64_t* size) {
// emulates the behavior of Write without actually writing
int64_t data_header_offset;
MockMemorySource source(0);
RETURN_NOT_OK(Write(&source, 0, &data_header_offset));
*size = source.GetExtentBytesWritten();
return Status::OK();
}

private:
Expand All @@ -211,6 +204,14 @@ Status WriteRowBatch(MemorySource* dst, const RowBatch* batch, int64_t position,
RETURN_NOT_OK(serializer.AssemblePayload());
return serializer.Write(dst, position, header_offset);
}

Status GetRowBatchSize(const RowBatch* batch, int64_t* size) {
RowBatchWriter serializer(batch, kMaxIpcRecursionDepth);
RETURN_NOT_OK(serializer.AssemblePayload());
RETURN_NOT_OK(serializer.GetTotalSize(size));
return Status::OK();
}

// ----------------------------------------------------------------------
// Row batch read path

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/ipc/adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Status WriteRowBatch(MemorySource* dst, const RowBatch* batch, int64_t position,
// Compute the precise number of bytes needed in a contiguous memory segment to
// write the row batch. This involves generating the complete serialized
// Flatbuffers metadata.
int64_t GetRowBatchSize(const RowBatch* batch);
Status GetRowBatchSize(const RowBatch* batch, int64_t* size);

// ----------------------------------------------------------------------
// "Read" path; does not copy data if the MemorySource does not
Expand Down
28 changes: 28 additions & 0 deletions cpp/src/arrow/ipc/ipc-adapter-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,34 @@ INSTANTIATE_TEST_CASE_P(RoundTripTests, TestWriteRowBatch,
::testing::Values(&MakeIntRowBatch, &MakeListRowBatch, &MakeNonNullRowBatch,
&MakeZeroLengthRowBatch, &MakeDeeplyNestedList));

void TestGetRowBatchSize(std::shared_ptr<RowBatch> batch) {
MockMemorySource mock_source(1 << 16);
int64_t mock_header_location;
int64_t size;
ASSERT_OK(WriteRowBatch(&mock_source, batch.get(), 0, &mock_header_location));
ASSERT_OK(GetRowBatchSize(batch.get(), &size));
ASSERT_EQ(mock_source.GetExtentBytesWritten(), size);
}

TEST_F(TestWriteRowBatch, IntegerGetRowBatchSize) {
std::shared_ptr<RowBatch> batch;

ASSERT_OK(MakeIntRowBatch(&batch));
TestGetRowBatchSize(batch);

ASSERT_OK(MakeListRowBatch(&batch));
TestGetRowBatchSize(batch);

ASSERT_OK(MakeZeroLengthRowBatch(&batch));
TestGetRowBatchSize(batch);

ASSERT_OK(MakeNonNullRowBatch(&batch));
TestGetRowBatchSize(batch);

ASSERT_OK(MakeDeeplyNestedList(&batch));
TestGetRowBatchSize(batch);
}

class RecursionLimits : public ::testing::Test, public MemoryMapFixture {
public:
void SetUp() { pool_ = default_memory_pool(); }
Expand Down
25 changes: 25 additions & 0 deletions cpp/src/arrow/ipc/memory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,5 +145,30 @@ Status MemoryMappedSource::Write(int64_t position, const uint8_t* data, int64_t
return Status::OK();
}

MockMemorySource::MockMemorySource(int64_t size)
: size_(size), extent_bytes_written_(0) {}

Status MockMemorySource::Close() {
return Status::OK();
}

Status MockMemorySource::ReadAt(
int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
return Status::OK();
}

Status MockMemorySource::Write(int64_t position, const uint8_t* data, int64_t nbytes) {
extent_bytes_written_ = std::max(extent_bytes_written_, position + nbytes);
return Status::OK();
}

int64_t MockMemorySource::Size() const {
return size_;
}

int64_t MockMemorySource::GetExtentBytesWritten() const {
return extent_bytes_written_;
}

} // namespace ipc
} // namespace arrow
22 changes: 22 additions & 0 deletions cpp/src/arrow/ipc/memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,28 @@ class MemoryMappedSource : public MemorySource {
std::unique_ptr<Impl> impl_;
};

// A MemorySource that tracks the size of allocations from a memory source
class MockMemorySource : public MemorySource {
public:
explicit MockMemorySource(int64_t size);

Status Close() override;

Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) override;

Status Write(int64_t position, const uint8_t* data, int64_t nbytes) override;

int64_t Size() const override;

// @return: the smallest number of bytes containing the modified region of the
// MockMemorySource
int64_t GetExtentBytesWritten() const;

private:
int64_t size_;
int64_t extent_bytes_written_;
};

} // namespace ipc
} // namespace arrow

Expand Down

0 comments on commit a541644

Please sign in to comment.