Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-100: [C++] Computing RowBatch size #61

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions cpp/src/arrow/ipc/adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,20 +179,13 @@ class RowBatchWriter {
}

// This must be called after invoking AssemblePayload
int64_t DataHeaderSize() {
// TODO(wesm): In case it is needed, compute the upper bound for the size
// of the buffer containing the flatbuffer data header.
return 0;
}

// Total footprint of buffers. This must be called after invoking
// AssemblePayload
int64_t TotalBytes() {
int64_t total = 0;
for (const std::shared_ptr<Buffer>& buffer : buffers_) {
total += buffer->size();
}
return total;
Status GetTotalSize(int64_t* size) {
// emulates the behavior of Write without actually writing
int64_t data_header_offset;
MockMemorySource source(0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the MockMemorySource is used in the implementation of of GetTotalSize. This is OK for now

RETURN_NOT_OK(Write(&source, 0, &data_header_offset));
*size = source.GetExtentBytesWritten();
return Status::OK();
}

private:
Expand All @@ -211,6 +204,14 @@ Status WriteRowBatch(MemorySource* dst, const RowBatch* batch, int64_t position,
RETURN_NOT_OK(serializer.AssemblePayload());
return serializer.Write(dst, position, header_offset);
}

Status GetRowBatchSize(const RowBatch* batch, int64_t* size) {
RowBatchWriter serializer(batch, kMaxIpcRecursionDepth);
RETURN_NOT_OK(serializer.AssemblePayload());
RETURN_NOT_OK(serializer.GetTotalSize(size));
return Status::OK();
}

// ----------------------------------------------------------------------
// Row batch read path

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/ipc/adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Status WriteRowBatch(MemorySource* dst, const RowBatch* batch, int64_t position,
// Compute the precise number of bytes needed in a contiguous memory segment to
// write the row batch. This involves generating the complete serialized
// Flatbuffers metadata.
int64_t GetRowBatchSize(const RowBatch* batch);
Status GetRowBatchSize(const RowBatch* batch, int64_t* size);

// ----------------------------------------------------------------------
// "Read" path; does not copy data if the MemorySource does not
Expand Down
28 changes: 28 additions & 0 deletions cpp/src/arrow/ipc/ipc-adapter-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,34 @@ INSTANTIATE_TEST_CASE_P(RoundTripTests, TestWriteRowBatch,
::testing::Values(&MakeIntRowBatch, &MakeListRowBatch, &MakeNonNullRowBatch,
&MakeZeroLengthRowBatch, &MakeDeeplyNestedList));

void TestGetRowBatchSize(std::shared_ptr<RowBatch> batch) {
MockMemorySource mock_source(1 << 16);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably pays to move the verification of GetRowBatchSize to a separate unit test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

int64_t mock_header_location;
int64_t size;
ASSERT_OK(WriteRowBatch(&mock_source, batch.get(), 0, &mock_header_location));
ASSERT_OK(GetRowBatchSize(batch.get(), &size));
ASSERT_EQ(mock_source.GetExtentBytesWritten(), size);
}

TEST_F(TestWriteRowBatch, IntegerGetRowBatchSize) {
std::shared_ptr<RowBatch> batch;

ASSERT_OK(MakeIntRowBatch(&batch));
TestGetRowBatchSize(batch);

ASSERT_OK(MakeListRowBatch(&batch));
TestGetRowBatchSize(batch);

ASSERT_OK(MakeZeroLengthRowBatch(&batch));
TestGetRowBatchSize(batch);

ASSERT_OK(MakeNonNullRowBatch(&batch));
TestGetRowBatchSize(batch);

ASSERT_OK(MakeDeeplyNestedList(&batch));
TestGetRowBatchSize(batch);
}

class RecursionLimits : public ::testing::Test, public MemoryMapFixture {
public:
void SetUp() { pool_ = default_memory_pool(); }
Expand Down
25 changes: 25 additions & 0 deletions cpp/src/arrow/ipc/memory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,5 +145,30 @@ Status MemoryMappedSource::Write(int64_t position, const uint8_t* data, int64_t
return Status::OK();
}

MockMemorySource::MockMemorySource(int64_t size)
: size_(size), extent_bytes_written_(0) {}

Status MockMemorySource::Close() {
return Status::OK();
}

Status MockMemorySource::ReadAt(
int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
return Status::OK();
}

Status MockMemorySource::Write(int64_t position, const uint8_t* data, int64_t nbytes) {
extent_bytes_written_ = std::max(extent_bytes_written_, position + nbytes);
return Status::OK();
}

int64_t MockMemorySource::Size() const {
return size_;
}

int64_t MockMemorySource::GetExtentBytesWritten() const {
return extent_bytes_written_;
}

} // namespace ipc
} // namespace arrow
22 changes: 22 additions & 0 deletions cpp/src/arrow/ipc/memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,28 @@ class MemoryMappedSource : public MemorySource {
std::unique_ptr<Impl> impl_;
};

// A MemorySource that tracks the size of allocations from a memory source
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this probably belongs in test-common.h (along with the implementation, I'm not sure if it is worth creating a new .cc file or just inlining)

class MockMemorySource : public MemorySource {
public:
explicit MockMemorySource(int64_t size);

Status Close() override;

Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) override;

Status Write(int64_t position, const uint8_t* data, int64_t nbytes) override;

int64_t Size() const override;

// @return: the smallest number of bytes containing the modified region of the
// MockMemorySource
int64_t GetExtentBytesWritten() const;

private:
int64_t size_;
int64_t extent_bytes_written_;
};

} // namespace ipc
} // namespace arrow

Expand Down