Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-100: [C++] Computing RowBatch size #61

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions cpp/src/arrow/ipc/adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,19 @@ class RowBatchWriter {
}

// This must be called after invoking AssemblePayload
int64_t DataHeaderSize() {
// TODO(wesm): In case it is needed, compute the upper bound for the size
// of the buffer containing the flatbuffer data header.
return 0;
Status DataHeaderSize(int64_t* size) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be good to add a verb to this function name (GetDataHeaderSize)

// emulates the behavior of Write without actually writing
int64_t offset = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two possible suggestions for reducing code duplication:

  1. Just pass the MockMemory object to write in GetRowBatchSize() and use that to compute the total size (this makes the existing unit test a little less useful, you would actually want to verify the entire span of writes done by the Write method) and would change my comment below about where to place the mock memory object.
  2. change Write to WritePrivate, and have an additional out parameter that is either data header size (int) or the buffer containing the written data header (it would still require passing in the MockMemory object.

do you like either of these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your review! (1) is a good solution, probably I'll go with that! Expect to update the PR tonight or some time tomorrow.

for (size_t i = 0; i < buffers_.size(); ++i) {
const Buffer* buffer = buffers_[i].get();
offset += buffer->size();
buffer_meta_.push_back(flatbuf::Buffer(0, 0, 0));
}
std::shared_ptr<Buffer> data_header;
RETURN_NOT_OK(WriteDataHeader(
batch_->num_rows(), offset, field_nodes_, buffer_meta_, &data_header));
*size = data_header->size();
return Status::OK();
}

// Total footprint of buffers. This must be called after invoking
Expand All @@ -179,6 +188,15 @@ Status WriteRowBatch(
RETURN_NOT_OK(serializer.AssemblePayload());
return serializer.Write(dst, position, header_offset);
}

Status GetRowBatchSize(const RowBatch* batch, int64_t* size) {
RowBatchWriter serializer(batch);
RETURN_NOT_OK(serializer.AssemblePayload());
RETURN_NOT_OK(serializer.DataHeaderSize(size));
*size += serializer.TotalBytes();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reasoning behind incrementing vs assigning here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

size is the sum of data header size (set on the previous line) and TotalBytes which is the buffer footprint. TotalBytes might be better named as TotalBufferBytes because it isn't obvious.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, I'm going to rename it for the PR!

return Status::OK();
}

// ----------------------------------------------------------------------
// Row batch read path

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/ipc/adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ Status WriteRowBatch(
// Compute the precise number of bytes needed in a contiguous memory segment to
// write the row batch. This involves generating the complete serialized
// Flatbuffers metadata.
int64_t GetRowBatchSize(const RowBatch* batch);
Status GetRowBatchSize(const RowBatch* batch, int64_t* size);

// ----------------------------------------------------------------------
// "Read" path; does not copy data if the MemorySource does not
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/ipc/ipc-adapter-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,21 @@ TEST_F(TestWriteRowBatch, IntegerRoundTrip) {
int64_t header_location;
ASSERT_OK(WriteRowBatch(mmap_.get(), &batch, 0, &header_location));

MockMemorySource mock_source(1 << 16);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably pays to move the verification of GetRowBatchSize to a separate unit test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

int64_t mock_header_location;
ASSERT_OK(WriteRowBatch(&mock_source, &batch, 0, &mock_header_location));

std::shared_ptr<RowBatchReader> result;
ASSERT_OK(RowBatchReader::Open(mmap_.get(), header_location, &result));

std::shared_ptr<RowBatch> batch_result;
ASSERT_OK(result->GetRowBatch(schema, &batch_result));
EXPECT_EQ(batch.num_rows(), batch_result->num_rows());

int64_t size;
ASSERT_OK(GetRowBatchSize(batch_result.get(), &size));
EXPECT_EQ(mock_source.Position(), size);

for (int i = 0; i < batch.num_columns(); ++i) {
EXPECT_TRUE(batch.column(i)->Equals(batch_result->column(i))) << i
<< batch.column_name(i);
Expand Down
24 changes: 24 additions & 0 deletions cpp/src/arrow/ipc/memory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,5 +144,29 @@ Status MemoryMappedSource::Write(int64_t position, const uint8_t* data, int64_t
return Status::OK();
}

MockMemorySource::MockMemorySource(int64_t size) : size_(size) {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per @emkornfield note below, we don't want to put unnecessary compiled symbols in the library object code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid code duplication, I changed the implementation using suggestion (1) of @emkornfield; I think this means that MockMemorySource needs to be exported in the object code, please let me know if there is a way around this!


Status MockMemorySource::Close() {
return Status::OK();
}

Status MockMemorySource::ReadAt(
int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
return Status::OK();
}

Status MockMemorySource::Write(int64_t position, const uint8_t* data, int64_t nbytes) {
pos_ = position + nbytes;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is worth clarifying in the header, that pos_ is always a function of the last Write call. A more complete (but possibly not necessary right now) would actually track all non-overlapping writes to to verify the actual number of bytes written)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Position() now returns the total length written

return Status::OK();
}

int64_t MockMemorySource::Size() const {
return size_;
}

int64_t MockMemorySource::Position() const {
return pos_;
}

} // namespace ipc
} // namespace arrow
20 changes: 20 additions & 0 deletions cpp/src/arrow/ipc/memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,26 @@ class MemoryMappedSource : public MemorySource {
std::unique_ptr<Impl> impl_;
};

// A MemorySource that tracks the size of allocations from a memory source
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this probably belongs in test-common.h (along with the implementation, I'm not sure if it is worth creating a new .cc file or just inlining)

class MockMemorySource : public MemorySource {
public:
explicit MockMemorySource(int64_t size);

Status Close() override;

Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) override;

Status Write(int64_t position, const uint8_t* data, int64_t nbytes) override;

int64_t Size() const override;

int64_t Position() const;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see note note in the .cc, I think it is worth providing some documentation on what Position means here.


private:
int64_t size_;
int64_t pos_;
};

} // namespace ipc
} // namespace arrow

Expand Down