Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): lazy allocation for upload buffer #11633

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 29 additions & 27 deletions google/cloud/storage/internal/object_write_streambuf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,8 @@ ObjectWriteStreambuf::ObjectWriteStreambuf(Status status)
: last_status_(std::move(status)),
max_buffer_size_(UploadChunkRequest::kChunkSizeQuantum),
span_options_(CurrentOptions()) {
current_ios_buffer_.resize(max_buffer_size_);
auto* pbeg = current_ios_buffer_.data();
auto* pend = pbeg + current_ios_buffer_.size();
setp(pbeg, pend);
current_ios_buffer_.reserve(UploadChunkRequest::kChunkSizeQuantum);
UpdatePutArea();
}

ObjectWriteStreambuf::ObjectWriteStreambuf(
Expand All @@ -54,10 +52,8 @@ ObjectWriteStreambuf::ObjectWriteStreambuf(
hash_validator_(std::move(hash_validator)),
auto_finalize_(auto_finalize),
span_options_(CurrentOptions()) {
current_ios_buffer_.resize(max_buffer_size_);
auto* pbeg = current_ios_buffer_.data();
auto* pend = pbeg + current_ios_buffer_.size();
setp(pbeg, pend);
current_ios_buffer_.reserve(UploadChunkRequest::kChunkSizeQuantum);
UpdatePutArea();
}

void ObjectWriteStreambuf::AutoFlushFinal() {
Expand Down Expand Up @@ -107,20 +103,20 @@ std::streamsize ObjectWriteStreambuf::xsputn(char const* s,
if (!IsOpen()) return traits_type::eof();

auto const actual_size = put_area_size();
if (count + actual_size >= max_buffer_size_) {
if (actual_size == 0) {
FlushRoundChunk({ConstBuffer(s, static_cast<std::size_t>(count))});
} else {
FlushRoundChunk({
ConstBuffer(pbase(), actual_size),
ConstBuffer(s, static_cast<std::size_t>(count)),
});
}
if (!last_status_.ok()) return traits_type::eof();
if (count + actual_size < max_buffer_size_) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q. Is there any reason to rewrite the test to avoid any possibility of overflow?

For example, if we know actual_size <= max_buffer_size_, then we might instead say...

  if (count < max_buffer_size_ - actual_size) {

Aside: Should the test be <=?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q. Is there any reason to rewrite the test to avoid any possibility of overflow?

For example, if we know actual_size <= max_buffer_size_, then we might instead say...

  if (count < max_buffer_size_ - actual_size) {

Thanks, done.

Aside: Should the test be <=?

I think it should be <: we want to flush the data when count == max_buffer_size - actual_size because at that point the buffer is large enough to justify the cost of said flush.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. It's not "is there space", but rather "is there space plus some". Thanks.

current_ios_buffer_.insert(current_ios_buffer_.end(), s, s + count);
UpdatePutArea();
return count;
}
if (actual_size == 0) {
FlushRoundChunk({ConstBuffer(s, static_cast<std::size_t>(count))});
} else {
std::copy(s, s + count, pptr());
pbump(static_cast<int>(count));
FlushRoundChunk({
ConstBuffer(pbase(), actual_size),
ConstBuffer(s, static_cast<std::size_t>(count)),
});
}
if (!last_status_.ok()) return traits_type::eof();
return count;
}

Expand All @@ -131,8 +127,8 @@ ObjectWriteStreambuf::int_type ObjectWriteStreambuf::overflow(int_type ch) {

auto actual_size = put_area_size();
if (actual_size >= max_buffer_size_) Flush();
*pptr() = traits_type::to_char_type(ch);
pbump(1);
current_ios_buffer_.push_back(traits_type::to_char_type(ch));
UpdatePutArea();
return last_status_.ok() ? ch : traits_type::eof();
}

Expand Down Expand Up @@ -163,7 +159,7 @@ void ObjectWriteStreambuf::FlushFinal() {
headers_ = std::move(response->request_metadata);

// Reset the iostream put area with valid pointers, but empty.
current_ios_buffer_.resize(1);
current_ios_buffer_.clear();
auto* pbeg = current_ios_buffer_.data();
setp(pbeg, pbeg);
}
Expand Down Expand Up @@ -213,13 +209,12 @@ void ObjectWriteStreambuf::FlushRoundChunk(ConstBufferSequence buffers) {

// Reset the internal buffer and copy any trailing bytes from `buffers` to
// it.
auto* pbeg = current_ios_buffer_.data();
setp(pbeg, pbeg + current_ios_buffer_.size());
current_ios_buffer_.clear();
PopFrontBytes(buffers, rounded_size);
for (auto const& b : buffers) {
std::copy(b.begin(), b.end(), pptr());
pbump(static_cast<int>(b.size()));
current_ios_buffer_.insert(current_ios_buffer_.end(), b.begin(), b.end());
}
UpdatePutArea();

metadata_ = std::move(response->payload);
committed_size_ = response->committed_size.value_or(0);
Expand All @@ -240,6 +235,13 @@ void ObjectWriteStreambuf::FlushRoundChunk(ConstBufferSequence buffers) {
}
}

void ObjectWriteStreambuf::UpdatePutArea() {
auto* pbeg = current_ios_buffer_.data();
auto const n = current_ios_buffer_.size();
setp(pbeg, pbeg + n);
if (!current_ios_buffer_.empty()) pbump(static_cast<int>(n));
}

} // namespace internal
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace storage
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/storage/internal/object_write_streambuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ class ObjectWriteStreambuf : public std::basic_streambuf<char> {
/// The current used bytes in the put area (aka current_ios_buffer_)
std::size_t put_area_size() const { return pptr() - pbase(); }

void UpdatePutArea();

std::shared_ptr<RawClient> client_;
ResumableUploadRequest request_;
Status last_status_;
Expand Down